Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Omnilogic Integration."""
2 
3 from datetime import timedelta
4 import logging
5 from typing import Any
6 
7 from omnilogic import OmniLogic, OmniLogicException
8 
9 from homeassistant.config_entries import ConfigEntry
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import ALL_ITEM_KINDS
14 
15 _LOGGER = logging.getLogger(__name__)
16 
17 
18 class OmniLogicUpdateCoordinator(DataUpdateCoordinator[dict[tuple, dict[str, Any]]]):
19  """Class to manage fetching update data from single endpoint."""
20 
21  def __init__(
22  self,
23  hass: HomeAssistant,
24  api: OmniLogic,
25  name: str,
26  config_entry: ConfigEntry,
27  polling_interval: int,
28  ) -> None:
29  """Initialize the global Omnilogic data updater."""
30  self.apiapi = api
31  self.config_entryconfig_entryconfig_entry = config_entry
32 
33  super().__init__(
34  hass=hass,
35  logger=_LOGGER,
36  name=name,
37  update_interval=timedelta(seconds=polling_interval),
38  )
39 
40  async def _async_update_data(self):
41  """Fetch data from OmniLogic."""
42  try:
43  data = await self.apiapi.get_telemetry_data()
44 
45  except OmniLogicException as error:
46  raise UpdateFailed(f"Error updating from OmniLogic: {error}") from error
47 
48  parsed_data = {}
49 
50  def get_item_data(item, item_kind, current_id, data):
51  """Get data per kind of Omnilogic API item."""
52  if isinstance(item, list):
53  for single_item in item:
54  data = get_item_data(single_item, item_kind, current_id, data)
55 
56  if "systemId" in item:
57  system_id = item["systemId"]
58  current_id = (*current_id, item_kind, system_id)
59  data[current_id] = item
60 
61  for kind in ALL_ITEM_KINDS:
62  if kind in item:
63  data = get_item_data(item[kind], kind, current_id, data)
64 
65  return data
66 
67  return get_item_data(data, "Backyard", (), parsed_data)
None __init__(self, HomeAssistant hass, OmniLogic api, str name, ConfigEntry config_entry, int polling_interval)
Definition: coordinator.py:28