1 """Data coordinators for the ring integration."""
3 from asyncio
import TaskGroup
4 from collections.abc
import Callable, Coroutine
6 from typing
import TYPE_CHECKING, Any
8 from ring_doorbell
import (
16 from ring_doorbell.listen
import RingEventListener
18 from homeassistant
import config_entries
22 BaseDataUpdateCoordinatorProtocol,
23 DataUpdateCoordinator,
27 from .const
import SCAN_INTERVAL
29 _LOGGER = logging.getLogger(__name__)
32 async
def _call_api[*_Ts, _R](
34 target: Callable[[*_Ts], Coroutine[Any, Any, _R]],
39 return await target(*args)
40 except AuthenticationError
as err:
43 raise ConfigEntryAuthFailed
from err
44 except RingTimeout
as err:
46 f
"Timeout communicating with API{msg_suffix}: {err}"
48 except RingError
as err:
49 raise UpdateFailed(f
"Error communicating with API{msg_suffix}: {err}")
from err
53 """Base class for device coordinators."""
60 """Initialize my coordinator."""
65 update_interval=SCAN_INTERVAL,
67 self.ring_api: Ring = ring_api
71 """Fetch data from API endpoint."""
72 update_method: str = (
73 "async_update_data" if self.
first_callfirst_call
else "async_update_devices"
75 await _call_api(self.
hasshass, getattr(self.ring_api, update_method))
77 devices: RingDevices = self.ring_api.
devices()
79 for device
in devices.all_devices:
82 if device.id
in subscribed_device_ids:
84 async
with TaskGroup()
as tg:
85 if device.has_capability(
"history"):
89 lambda device: device.async_history(limit=10),
91 msg_suffix=f
" for device {device.name}",
97 device.async_update_health_data,
98 msg_suffix=f
" for device {device.name}",
101 except ExceptionGroup
as eg:
102 raise eg.exceptions[0]
108 """Global notifications coordinator."""
116 listen_credentials: dict[str, Any] |
None,
117 listen_credentials_updater: Callable[[dict[str, Any]],
None],
119 """Initialize my coordinator."""
122 self.ring_api: Ring = ring_api
124 ring_api, listen_credentials, listen_credentials_updater
126 self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object |
None]] = {}
129 config_entry = config_entries.current_entry.get()
138 "Index the active alerts."
140 (alert.doorbot_id, alert.kind): alert
141 for alert
in self.ring_api.active_alerts()
145 """Cancel any scheduled call, and ignore new runs."""
150 self.
loggerlogger.debug(
"Stopped ring listener")
152 self.
loggerlogger.debug(
"Stopped ring listener")
155 """Start listening for realtime events."""
156 self.
loggerlogger.debug(
"Starting ring listener.")
161 self.
loggerlogger.debug(
"Started ring listener")
163 self.
loggerlogger.warning(
164 "Ring event listener failed to start after %s seconds",
175 self.
loggerlogger.debug(
"Ring event received: %s", event)
181 """Update all registered listeners."""
182 for update_callback, device_api_id
in list(self._listeners.values()):
183 if not doorbot_id
or device_api_id == doorbot_id:
188 self, update_callback: CALLBACK_TYPE, context: Any =
None
189 ) -> Callable[[],
None]:
190 """Listen for data updates."""
191 start_listen =
not self._listeners
194 def remove_listener() -> None:
195 """Remove update listener."""
196 self._listeners.pop(remove_listener)
197 if not self._listeners:
201 "Ring event listener stop",
205 self._listeners[remove_listener] = (update_callback, context)
212 "Ring event listener start",
215 return remove_listener
None __init__(self, HomeAssistant hass, Ring ring_api)
RingDevices _async_update_data(self)
None async_shutdown(self)
None _on_event(self, RingEvent event)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None _async_start_listen(self)
None _async_stop_listen(self)
None _async_update_listeners(self, int|None doorbot_id=None)
None __init__(self, HomeAssistant hass, Ring ring_api, dict[str, Any]|None listen_credentials, Callable[[dict[str, Any]], None] listen_credentials_updater)
Generator[Any] async_contexts(self)
dict[str, dict[str, Any]] devices(HomeAssistant hass)