1 """Coordinator for lookin devices."""
3 from __future__
import annotations
5 from collections.abc
import Awaitable, Callable
6 from datetime
import timedelta
13 from .const
import NEVER_TIME, POLLING_FALLBACK_SECONDS
15 _LOGGER = logging.getLogger(__name__)
19 """Keep track of when the last push update was."""
22 """Init the push coordininator."""
27 """Remember the last push time."""
30 def active(self, interval: timedelta) -> bool:
31 """Check if the last push update was recently."""
32 time_since_last_update = time.monotonic() - self.
last_updatelast_update
33 is_active = time_since_last_update < POLLING_FALLBACK_SECONDS
35 "%s: push updates active: %s (time_since_last_update=%s)",
38 time_since_last_update,
44 """DataUpdateCoordinator to gather data for a specific lookin devices."""
49 push_coordinator: LookinPushCoordinator,
51 update_interval: timedelta |
None =
None,
52 update_method: Callable[[], Awaitable[_DataT]] |
None =
None,
54 """Initialize DataUpdateCoordinator to gather data for specific device."""
60 update_interval=update_interval,
61 update_method=update_method,
67 """Manually update data, notify listeners and reset refresh interval, and remember."""
72 """Fetch data only if we have not received a push inside the interval."""
_DataT _async_update_data(self)
None async_set_updated_data(self, _DataT data)
None __init__(self, HomeAssistant hass, LookinPushCoordinator push_coordinator, str name, timedelta|None update_interval=None, Callable[[], Awaitable[_DataT]]|None update_method=None)
bool active(self, timedelta interval)
None __init__(self, str name)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
IssData update(pyiss.ISS iss)