Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data coordinators for the ring integration."""
2 
3 from asyncio import TaskGroup
4 from collections.abc import Callable, Coroutine
5 import logging
6 from typing import TYPE_CHECKING, Any
7 
8 from ring_doorbell import (
9  AuthenticationError,
10  Ring,
11  RingDevices,
12  RingError,
13  RingEvent,
14  RingTimeout,
15 )
16 from ring_doorbell.listen import RingEventListener
17 
18 from homeassistant import config_entries
19 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
20 from homeassistant.exceptions import ConfigEntryAuthFailed
22  BaseDataUpdateCoordinatorProtocol,
23  DataUpdateCoordinator,
24  UpdateFailed,
25 )
26 
27 from .const import SCAN_INTERVAL
28 
29 _LOGGER = logging.getLogger(__name__)
30 
31 
32 async def _call_api[*_Ts, _R](
33  hass: HomeAssistant,
34  target: Callable[[*_Ts], Coroutine[Any, Any, _R]],
35  *args: *_Ts,
36  msg_suffix: str = "",
37 ) -> _R:
38  try:
39  return await target(*args)
40  except AuthenticationError as err:
41  # Raising ConfigEntryAuthFailed will cancel future updates
42  # and start a config flow with SOURCE_REAUTH (async_step_reauth)
43  raise ConfigEntryAuthFailed from err
44  except RingTimeout as err:
45  raise UpdateFailed(
46  f"Timeout communicating with API{msg_suffix}: {err}"
47  ) from err
48  except RingError as err:
49  raise UpdateFailed(f"Error communicating with API{msg_suffix}: {err}") from err
50 
51 
53  """Base class for device coordinators."""
54 
55  def __init__(
56  self,
57  hass: HomeAssistant,
58  ring_api: Ring,
59  ) -> None:
60  """Initialize my coordinator."""
61  super().__init__(
62  hass,
63  name="devices",
64  logger=_LOGGER,
65  update_interval=SCAN_INTERVAL,
66  )
67  self.ring_api: Ring = ring_api
68  self.first_callfirst_call: bool = True
69 
70  async def _async_update_data(self) -> RingDevices:
71  """Fetch data from API endpoint."""
72  update_method: str = (
73  "async_update_data" if self.first_callfirst_call else "async_update_devices"
74  )
75  await _call_api(self.hasshass, getattr(self.ring_api, update_method))
76  self.first_callfirst_call = False
77  devices: RingDevices = self.ring_api.devices()
78  subscribed_device_ids = set(self.async_contextsasync_contexts())
79  for device in devices.all_devices:
80  # Don't update all devices in the ring api, only those that set
81  # their device id as context when they subscribed.
82  if device.id in subscribed_device_ids:
83  try:
84  async with TaskGroup() as tg:
85  if device.has_capability("history"):
86  tg.create_task(
87  _call_api(
88  self.hasshass,
89  lambda device: device.async_history(limit=10),
90  device,
91  msg_suffix=f" for device {device.name}", # device_id is the mac
92  )
93  )
94  tg.create_task(
95  _call_api(
96  self.hasshass,
97  device.async_update_health_data,
98  msg_suffix=f" for device {device.name}",
99  )
100  )
101  except ExceptionGroup as eg:
102  raise eg.exceptions[0] # noqa: B904
103 
104  return devices
105 
106 
108  """Global notifications coordinator."""
109 
110  config_entry: config_entries.ConfigEntry
111 
112  def __init__(
113  self,
114  hass: HomeAssistant,
115  ring_api: Ring,
116  listen_credentials: dict[str, Any] | None,
117  listen_credentials_updater: Callable[[dict[str, Any]], None],
118  ) -> None:
119  """Initialize my coordinator."""
120  self.hasshass = hass
121  self.loggerlogger = _LOGGER
122  self.ring_api: Ring = ring_api
123  self.event_listenerevent_listener = RingEventListener(
124  ring_api, listen_credentials, listen_credentials_updater
125  )
126  self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
127  self._listen_callback_id_listen_callback_id: int | None = None
128 
129  config_entry = config_entries.current_entry.get()
130  if TYPE_CHECKING:
131  assert config_entry
132  self.config_entryconfig_entry = config_entry
133  self.start_timeoutstart_timeout = 10
134  self.config_entryconfig_entry.async_on_unload(self.async_shutdownasync_shutdown)
135  self.index_alertsindex_alerts()
136 
137  def index_alerts(self) -> None:
138  "Index the active alerts."
139  self.alertsalerts = {
140  (alert.doorbot_id, alert.kind): alert
141  for alert in self.ring_api.active_alerts()
142  }
143 
144  async def async_shutdown(self) -> None:
145  """Cancel any scheduled call, and ignore new runs."""
146  if self.event_listenerevent_listener.started:
147  await self._async_stop_listen_async_stop_listen()
148 
149  async def _async_stop_listen(self) -> None:
150  self.loggerlogger.debug("Stopped ring listener")
151  await self.event_listenerevent_listener.stop()
152  self.loggerlogger.debug("Stopped ring listener")
153 
154  async def _async_start_listen(self) -> None:
155  """Start listening for realtime events."""
156  self.loggerlogger.debug("Starting ring listener.")
157  await self.event_listenerevent_listener.start(
158  timeout=self.start_timeoutstart_timeout,
159  )
160  if self.event_listenerevent_listener.started is True:
161  self.loggerlogger.debug("Started ring listener")
162  else:
163  self.loggerlogger.warning(
164  "Ring event listener failed to start after %s seconds",
165  self.start_timeoutstart_timeout,
166  )
167  self._listen_callback_id_listen_callback_id = self.event_listenerevent_listener.add_notification_callback(
168  self._on_event_on_event
169  )
170  self.index_alertsindex_alerts()
171  # Update the listeners so they switch from Unavailable to Unknown
172  self._async_update_listeners_async_update_listeners()
173 
174  def _on_event(self, event: RingEvent) -> None:
175  self.loggerlogger.debug("Ring event received: %s", event)
176  self.index_alertsindex_alerts()
177  self._async_update_listeners_async_update_listeners(event.doorbot_id)
178 
179  @callback
180  def _async_update_listeners(self, doorbot_id: int | None = None) -> None:
181  """Update all registered listeners."""
182  for update_callback, device_api_id in list(self._listeners.values()):
183  if not doorbot_id or device_api_id == doorbot_id:
184  update_callback()
185 
186  @callback
188  self, update_callback: CALLBACK_TYPE, context: Any = None
189  ) -> Callable[[], None]:
190  """Listen for data updates."""
191  start_listen = not self._listeners
192 
193  @callback
194  def remove_listener() -> None:
195  """Remove update listener."""
196  self._listeners.pop(remove_listener)
197  if not self._listeners:
198  self.config_entryconfig_entry.async_create_task(
199  self.hasshass,
200  self._async_stop_listen_async_stop_listen(),
201  "Ring event listener stop",
202  eager_start=True,
203  )
204 
205  self._listeners[remove_listener] = (update_callback, context)
206 
207  # This is the first listener, start the event listener.
208  if start_listen:
209  self.config_entryconfig_entry.async_create_task(
210  self.hasshass,
211  self._async_start_listen_async_start_listen(),
212  "Ring event listener start",
213  eager_start=True,
214  )
215  return remove_listener
None __init__(self, HomeAssistant hass, Ring ring_api)
Definition: coordinator.py:59
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Definition: coordinator.py:189
None _async_update_listeners(self, int|None doorbot_id=None)
Definition: coordinator.py:180
None __init__(self, HomeAssistant hass, Ring ring_api, dict[str, Any]|None listen_credentials, Callable[[dict[str, Any]], None] listen_credentials_updater)
Definition: coordinator.py:118
dict[str, dict[str, Any]] devices(HomeAssistant hass)
Definition: __init__.py:237