Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Risco integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import Any
8 
9 from pyrisco import CannotConnectError, OperationError, RiscoCloud, UnauthorizedError
10 from pyrisco.cloud.alarm import Alarm
11 from pyrisco.cloud.event import Event
12 
13 from homeassistant.core import HomeAssistant
14 from homeassistant.helpers.storage import Store
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
16 
17 from .const import DOMAIN
18 
19 LAST_EVENT_STORAGE_VERSION = 1
20 LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
21 _LOGGER = logging.getLogger(__name__)
22 
23 
25  """Class to manage fetching risco data."""
26 
27  def __init__(
28  self, hass: HomeAssistant, risco: RiscoCloud, scan_interval: int
29  ) -> None:
30  """Initialize global risco data updater."""
31  self.riscorisco = risco
32  interval = timedelta(seconds=scan_interval)
33  super().__init__(
34  hass,
35  _LOGGER,
36  name=DOMAIN,
37  update_interval=interval,
38  )
39 
40  async def _async_update_data(self) -> Alarm:
41  """Fetch data from risco."""
42  try:
43  return await self.riscorisco.get_state()
44  except (CannotConnectError, UnauthorizedError, OperationError) as error:
45  raise UpdateFailed(error) from error
46 
47 
49  """Class to manage fetching risco data."""
50 
51  def __init__(
52  self, hass: HomeAssistant, risco: RiscoCloud, eid: str, scan_interval: int
53  ) -> None:
54  """Initialize global risco data updater."""
55  self.riscorisco = risco
56  self._store_store = Store[dict[str, Any]](
57  hass, LAST_EVENT_STORAGE_VERSION, f"risco_{eid}_last_event_timestamp"
58  )
59  interval = timedelta(seconds=scan_interval)
60  super().__init__(
61  hass,
62  _LOGGER,
63  name=f"{DOMAIN}_events",
64  update_interval=interval,
65  )
66 
67  async def _async_update_data(self) -> list[Event]:
68  """Fetch data from risco."""
69  last_store = await self._store_store.async_load() or {}
70  last_timestamp = last_store.get(
71  LAST_EVENT_TIMESTAMP_KEY, "2020-01-01T00:00:00Z"
72  )
73  try:
74  events = await self.riscorisco.get_events(last_timestamp, 10)
75  except (CannotConnectError, UnauthorizedError, OperationError) as error:
76  raise UpdateFailed(error) from error
77 
78  if len(events) > 0:
79  await self._store_store.async_save({LAST_EVENT_TIMESTAMP_KEY: events[0].time})
80 
81  return events
None __init__(self, HomeAssistant hass, RiscoCloud risco, int scan_interval)
Definition: coordinator.py:29
None __init__(self, HomeAssistant hass, RiscoCloud risco, str eid, int scan_interval)
Definition: coordinator.py:53
str|float get_state(dict[str, float] data, str key)
Definition: sensor.py:26
None async_load(HomeAssistant hass)
None async_save(self, _T data)
Definition: storage.py:424