1 """Tradfri DataUpdateCoordinator."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import timedelta
9 from pytradfri.command
import Command
10 from pytradfri.device
import Device
11 from pytradfri.error
import RequestError
16 from .const
import LOGGER
22 """Coordinator to manage data for a specific Tradfri device."""
28 api: Callable[[Command | list[Command]], Any],
31 """Initialize device coordinator."""
34 self.
_exception_exception: Exception |
None =
None
39 name=f
"Update coordinator for {device}",
40 update_interval=
timedelta(seconds=SCAN_INTERVAL),
44 """Set status of hub."""
52 """Update the coordinator for a device when a change is detected."""
57 """Schedule handling exception.."""
61 """Handle observe exceptions in a coroutine."""
66 "Observation failed for %s, trying again", self.
devicedevice, exc_info=exc
73 """Fetch data from the gateway for a specific device."""
79 except RequestError
as err:
80 raise UpdateFailed(f
"Error communicating with API: {err}.")
from err
84 cmd = self.
devicedevice.observe(
89 await self.
apiapi(cmd)
90 except RequestError
as err:
91 raise UpdateFailed(f
"Error communicating with API: {err}.")
from err
None _observe_update(self, Device device)
None set_hub_available(self, bool available)
None __init__(self, HomeAssistant hass, *Callable[[Command|list[Command]], Any] api, Device device)
None _exception_callback(self, Exception exc)
Device _async_update_data(self)
None _handle_exception(self, Exception exc)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
None async_set_updated_data(self, _DataT data)
None async_request_refresh(self)