Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for System Bridge."""
2 
3 from __future__ import annotations
4 
5 from asyncio import Task
6 from collections.abc import Callable
7 from datetime import timedelta
8 import logging
9 from typing import Any
10 
11 from systembridgeconnector.exceptions import (
12  AuthenticationException,
13  ConnectionClosedException,
14  ConnectionErrorException,
15 )
16 from systembridgeconnector.websocket_client import WebSocketClient
17 from systembridgemodels.modules import (
18  GetData,
19  Module,
20  ModulesData,
21  RegisterDataListener,
22 )
23 
24 from homeassistant.config_entries import ConfigEntry
25 from homeassistant.const import (
26  CONF_HOST,
27  CONF_PORT,
28  CONF_TOKEN,
29  EVENT_HOMEASSISTANT_STOP,
30 )
31 from homeassistant.core import HomeAssistant
32 from homeassistant.exceptions import ConfigEntryAuthFailed
33 from homeassistant.helpers.aiohttp_client import async_get_clientsession
34 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
35 
36 from .const import DOMAIN, MODULES
37 from .data import SystemBridgeData
38 
39 
41  """Class to manage fetching System Bridge data from single endpoint."""
42 
43  def __init__(
44  self,
45  hass: HomeAssistant,
46  LOGGER: logging.Logger,
47  *,
48  entry: ConfigEntry,
49  ) -> None:
50  """Initialize global System Bridge data updater."""
51  self.titletitle = entry.title
52  self.unsubunsub: Callable | None = None
53 
54  self.listen_tasklisten_task: Task | None = None
55  self.websocket_clientwebsocket_client = WebSocketClient(
56  api_host=entry.data[CONF_HOST],
57  api_port=entry.data[CONF_PORT],
58  token=entry.data[CONF_TOKEN],
59  session=async_get_clientsession(hass),
60  can_close_session=False,
61  )
62 
63  self._host_host = entry.data[CONF_HOST]
64 
65  super().__init__(
66  hass,
67  LOGGER,
68  name=DOMAIN,
69  update_interval=timedelta(seconds=30),
70  )
71 
73 
74  async def check_websocket_connected(self) -> None:
75  """Check if WebSocket is connected."""
76  self.loggerlogger.debug(
77  "[check_websocket_connected] WebSocket connected: %s",
78  self.websocket_clientwebsocket_client.connected,
79  )
80 
81  if not self.websocket_clientwebsocket_client.connected:
82  try:
83  await self.websocket_clientwebsocket_client.connect()
84  except ConnectionErrorException as exception:
85  self.loggerlogger.warning(
86  "[check_websocket_connected] Connection error occurred for %s: %s",
87  self.titletitle,
88  exception,
89  )
90  await self.clean_disconnectclean_disconnect()
91 
92  async def close_websocket(self) -> None:
93  """Close WebSocket connection."""
94  await self.websocket_clientwebsocket_client.close()
95  if self.listen_tasklisten_task is not None:
96  self.listen_tasklisten_task.cancel(
97  msg="WebSocket closed on Home Assistant shutdown",
98  )
99 
100  async def clean_disconnect(self) -> None:
101  """Clean disconnect WebSocket."""
102  if self.unsubunsub:
103  self.unsubunsub()
104  self.unsubunsub = None
105  self.last_update_successlast_update_successlast_update_success = False
106  self.async_update_listenersasync_update_listeners()
107  if self.listen_tasklisten_task is not None:
108  self.listen_tasklisten_task.cancel(
109  msg="WebSocket disconnected",
110  )
111 
112  async def async_get_data(
113  self,
114  modules: list[Module],
115  ) -> ModulesData:
116  """Get data from WebSocket."""
117  await self.check_websocket_connectedcheck_websocket_connected()
118 
119  modules_data = await self.websocket_clientwebsocket_client.get_data(GetData(modules=modules))
120 
121  # Merge new data with existing data
122  for module in MODULES:
123  if hasattr(modules_data, module):
124  self.loggerlogger.debug("[async_get_data] Set new data for: %s", module)
125  setattr(self.datadatadata, module, getattr(modules_data, module))
126 
127  return modules_data
128 
130  self,
131  module_name: str,
132  module: Any,
133  ) -> None:
134  """Handle data from the WebSocket client."""
135  self.loggerlogger.debug("[async_handle_module] Set new data for: %s", module_name)
136  setattr(self.datadatadata, module_name, module)
137  self.async_set_updated_dataasync_set_updated_data(self.datadatadata)
138 
139  async def _listen_for_data(self) -> None:
140  """Listen for events from the WebSocket."""
141  try:
142  await self.websocket_clientwebsocket_client.listen(callback=self.async_handle_moduleasync_handle_module)
143  except AuthenticationException as exception:
144  self.loggerlogger.error(
145  "Authentication failed while listening for %s: %s",
146  self.titletitle,
147  exception,
148  )
149  await self.clean_disconnectclean_disconnect()
150  except (ConnectionClosedException, ConnectionResetError) as exception:
151  self.loggerlogger.debug(
152  "[_listen_for_data] Websocket connection closed for %s: %s",
153  self.titletitle,
154  exception,
155  )
156  await self.clean_disconnectclean_disconnect()
157  except ConnectionErrorException as exception:
158  self.loggerlogger.debug(
159  "[_listen_for_data] Connection error occurred for %s: %s",
160  self.titletitle,
161  exception,
162  )
163  await self.clean_disconnectclean_disconnect()
164 
165  async def _async_update_data(self) -> SystemBridgeData:
166  """Update System Bridge data from WebSocket."""
167  if self.listen_tasklisten_task is None or not self.websocket_clientwebsocket_client.connected:
168  await self.check_websocket_connectedcheck_websocket_connected()
169 
170  self.loggerlogger.debug("Create listener task for %s", self.titletitle)
171  self.listen_tasklisten_task = self.hasshass.async_create_background_task(
172  self._listen_for_data_listen_for_data(),
173  name="System Bridge WebSocket Listener",
174  eager_start=False,
175  )
176  self.loggerlogger.debug("Listening for data from %s", self.titletitle)
177 
178  try:
179  await self.websocket_clientwebsocket_client.register_data_listener(
180  RegisterDataListener(modules=MODULES)
181  )
182  except AuthenticationException as exception:
183  self.loggerlogger.error(
184  "Authentication failed at setup for %s: %s", self.titletitle, exception
185  )
186  await self.clean_disconnectclean_disconnect()
187  raise ConfigEntryAuthFailed from exception
188  except (ConnectionClosedException, ConnectionErrorException) as exception:
189  self.loggerlogger.warning(
190  "[register] Connection error occurred for %s: %s",
191  self.titletitle,
192  exception,
193  )
194  await self.clean_disconnectclean_disconnect()
195  return self.datadatadata
196 
197  self.loggerlogger.debug("Registered data listener for %s", self.titletitle)
198 
199  self.last_update_successlast_update_successlast_update_success = True
200  self.async_update_listenersasync_update_listeners()
201 
202  # Clean disconnect WebSocket on Home Assistant shutdown
203  self.unsubunsub = self.hasshass.bus.async_listen_once(
204  EVENT_HOMEASSISTANT_STOP,
205  lambda _: self.close_websocketclose_websocket(),
206  )
207 
208  self.loggerlogger.debug("[_async_update_data] Done")
209 
210  return self.datadatadata
None __init__(self, HomeAssistant hass, logging.Logger LOGGER, *ConfigEntry entry)
Definition: coordinator.py:49
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)