1 """Coordinator for the Risco integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
9 from pyrisco
import CannotConnectError, OperationError, RiscoCloud, UnauthorizedError
10 from pyrisco.cloud.alarm
import Alarm
11 from pyrisco.cloud.event
import Event
17 from .const
import DOMAIN
19 LAST_EVENT_STORAGE_VERSION = 1
20 LAST_EVENT_TIMESTAMP_KEY =
"last_event_timestamp"
21 _LOGGER = logging.getLogger(__name__)
25 """Class to manage fetching risco data."""
28 self, hass: HomeAssistant, risco: RiscoCloud, scan_interval: int
30 """Initialize global risco data updater."""
32 interval =
timedelta(seconds=scan_interval)
37 update_interval=interval,
41 """Fetch data from risco."""
44 except (CannotConnectError, UnauthorizedError, OperationError)
as error:
49 """Class to manage fetching risco data."""
52 self, hass: HomeAssistant, risco: RiscoCloud, eid: str, scan_interval: int
54 """Initialize global risco data updater."""
56 self.
_store_store = Store[dict[str, Any]](
57 hass, LAST_EVENT_STORAGE_VERSION, f
"risco_{eid}_last_event_timestamp"
59 interval =
timedelta(seconds=scan_interval)
63 name=f
"{DOMAIN}_events",
64 update_interval=interval,
68 """Fetch data from risco."""
70 last_timestamp = last_store.get(
71 LAST_EVENT_TIMESTAMP_KEY,
"2020-01-01T00:00:00Z"
74 events = await self.
riscorisco.get_events(last_timestamp, 10)
75 except (CannotConnectError, UnauthorizedError, OperationError)
as error:
79 await self.
_store_store.
async_save({LAST_EVENT_TIMESTAMP_KEY: events[0].time})
None __init__(self, HomeAssistant hass, RiscoCloud risco, int scan_interval)
Alarm _async_update_data(self)
None __init__(self, HomeAssistant hass, RiscoCloud risco, str eid, int scan_interval)
list[Event] _async_update_data(self)
str|float get_state(dict[str, float] data, str key)
None async_load(HomeAssistant hass)
None async_save(self, _T data)