Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Helpers to help coordinate updates."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Coroutine
6 from datetime import timedelta
7 import logging
8 from typing import Any
9 
10 from aiohttp import ClientConnectorError, ServerDisconnectedError
11 from pyoverkiz.client import OverkizClient
12 from pyoverkiz.enums import EventName, ExecutionState, Protocol
13 from pyoverkiz.exceptions import (
14  BadCredentialsException,
15  InvalidEventListenerIdException,
16  MaintenanceException,
17  NotAuthenticatedException,
18  TooManyConcurrentRequestsException,
19  TooManyRequestsException,
20 )
21 from pyoverkiz.models import Device, Event, Place
22 
23 from homeassistant.core import HomeAssistant
24 from homeassistant.exceptions import ConfigEntryAuthFailed
25 from homeassistant.helpers import device_registry as dr
26 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
27 from homeassistant.util.decorator import Registry
28 
29 from .const import DOMAIN, LOGGER, UPDATE_INTERVAL
30 
31 EVENT_HANDLERS: Registry[
32  str, Callable[[OverkizDataUpdateCoordinator, Event], Coroutine[Any, Any, None]]
33 ] = Registry()
34 
35 
37  """Class to manage fetching data from Overkiz platform."""
38 
39  def __init__(
40  self,
41  hass: HomeAssistant,
42  logger: logging.Logger,
43  *,
44  name: str,
45  client: OverkizClient,
46  devices: list[Device],
47  places: Place | None,
48  update_interval: timedelta | None = None,
49  config_entry_id: str,
50  ) -> None:
51  """Initialize global data updater."""
52  super().__init__(
53  hass,
54  logger,
55  name=name,
56  update_interval=update_interval,
57  )
58 
59  self.datadatadata = {}
60  self.clientclient = client
61  self.devicesdevices: dict[str, Device] = {d.device_url: d for d in devices}
62  self.is_statelessis_stateless = all(
63  device.protocol in (Protocol.RTS, Protocol.INTERNAL) for device in devices
64  )
65  self.executionsexecutions: dict[str, dict[str, str]] = {}
66  self.areasareas = self._places_to_area_places_to_area(places) if places else None
67  self.config_entry_idconfig_entry_id = config_entry_id
68 
69  async def _async_update_data(self) -> dict[str, Device]:
70  """Fetch Overkiz data via event listener."""
71  try:
72  events = await self.clientclient.fetch_events()
73  except BadCredentialsException as exception:
74  raise ConfigEntryAuthFailed("Invalid authentication.") from exception
75  except TooManyConcurrentRequestsException as exception:
76  raise UpdateFailed("Too many concurrent requests.") from exception
77  except TooManyRequestsException as exception:
78  raise UpdateFailed("Too many requests, try again later.") from exception
79  except MaintenanceException as exception:
80  raise UpdateFailed("Server is down for maintenance.") from exception
81  except InvalidEventListenerIdException as exception:
82  raise UpdateFailed(exception) from exception
83  except (TimeoutError, ClientConnectorError) as exception:
84  raise UpdateFailed("Failed to connect.") from exception
85  except (ServerDisconnectedError, NotAuthenticatedException):
86  self.executionsexecutions = {}
87 
88  # During the relogin, similar exceptions can be thrown.
89  try:
90  await self.clientclient.login()
91  self.devicesdevices = await self._get_devices_get_devices()
92  except BadCredentialsException as exception:
93  raise ConfigEntryAuthFailed("Invalid authentication.") from exception
94  except TooManyRequestsException as exception:
95  raise UpdateFailed("Too many requests, try again later.") from exception
96 
97  return self.devicesdevices
98 
99  for event in events:
100  LOGGER.debug(event)
101 
102  if event_handler := EVENT_HANDLERS.get(event.name):
103  await event_handler(self, event)
104 
105  if not self.executionsexecutions:
107 
108  return self.devicesdevices
109 
110  async def _get_devices(self) -> dict[str, Device]:
111  """Fetch devices."""
112  LOGGER.debug("Fetching all devices and state via /setup/devices")
113  return {d.device_url: d for d in await self.clientclient.get_devices(refresh=True)}
114 
115  def _places_to_area(self, place: Place) -> dict[str, str]:
116  """Convert places with sub_places to a flat dictionary [placeoid, label])."""
117  areas = {}
118  if isinstance(place, Place):
119  areas[place.oid] = place.label
120 
121  if isinstance(place.sub_places, list):
122  for sub_place in place.sub_places:
123  areas.update(self._places_to_area_places_to_area(sub_place))
124 
125  return areas
126 
127 
128 @EVENT_HANDLERS.register(EventName.DEVICE_AVAILABLE)
130  coordinator: OverkizDataUpdateCoordinator, event: Event
131 ) -> None:
132  """Handle device available event."""
133  if event.device_url:
134  coordinator.devices[event.device_url].available = True
135 
136 
137 @EVENT_HANDLERS.register(EventName.DEVICE_UNAVAILABLE)
138 @EVENT_HANDLERS.register(EventName.DEVICE_DISABLED)
140  coordinator: OverkizDataUpdateCoordinator, event: Event
141 ) -> None:
142  """Handle device unavailable / disabled event."""
143  if event.device_url:
144  coordinator.devices[event.device_url].available = False
145 
146 
147 @EVENT_HANDLERS.register(EventName.DEVICE_CREATED)
148 @EVENT_HANDLERS.register(EventName.DEVICE_UPDATED)
150  coordinator: OverkizDataUpdateCoordinator, event: Event
151 ) -> None:
152  """Handle device unavailable / disabled event."""
153  coordinator.hass.async_create_task(
154  coordinator.hass.config_entries.async_reload(coordinator.config_entry_id)
155  )
156 
157 
158 @EVENT_HANDLERS.register(EventName.DEVICE_STATE_CHANGED)
160  coordinator: OverkizDataUpdateCoordinator, event: Event
161 ) -> None:
162  """Handle device state changed event."""
163  if not event.device_url:
164  return
165 
166  for state in event.device_states:
167  device = coordinator.devices[event.device_url]
168  device.states[state.name] = state
169 
170 
171 @EVENT_HANDLERS.register(EventName.DEVICE_REMOVED)
173  coordinator: OverkizDataUpdateCoordinator, event: Event
174 ) -> None:
175  """Handle device removed event."""
176  if not event.device_url:
177  return
178 
179  base_device_url = event.device_url.split("#")[0]
180  registry = dr.async_get(coordinator.hass)
181 
182  if registered_device := registry.async_get_device(
183  identifiers={(DOMAIN, base_device_url)}
184  ):
185  registry.async_remove_device(registered_device.id)
186 
187  if event.device_url:
188  del coordinator.devices[event.device_url]
189 
190 
191 @EVENT_HANDLERS.register(EventName.EXECUTION_REGISTERED)
193  coordinator: OverkizDataUpdateCoordinator, event: Event
194 ) -> None:
195  """Handle execution registered event."""
196  if event.exec_id and event.exec_id not in coordinator.executions:
197  coordinator.executions[event.exec_id] = {}
198 
199  if not coordinator.is_stateless:
200  coordinator.update_interval = timedelta(seconds=1)
201 
202 
203 @EVENT_HANDLERS.register(EventName.EXECUTION_STATE_CHANGED)
205  coordinator: OverkizDataUpdateCoordinator, event: Event
206 ) -> None:
207  """Handle execution changed event."""
208  if event.exec_id in coordinator.executions and event.new_state in [
209  ExecutionState.COMPLETED,
210  ExecutionState.FAILED,
211  ]:
212  del coordinator.executions[event.exec_id]
None __init__(self, HomeAssistant hass, logging.Logger logger, *str name, OverkizClient client, list[Device] devices, Place|None places, timedelta|None update_interval=None, str config_entry_id)
Definition: coordinator.py:50
None on_execution_registered(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:194
None on_device_unavailable_disabled(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:141
None on_device_created_updated(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:151
None on_device_removed(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:174
None on_device_state_changed(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:161
None on_execution_state_changed(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:206
None on_device_available(OverkizDataUpdateCoordinator coordinator, Event event)
Definition: coordinator.py:131