1 """Coordinator for the Omnilogic Integration."""
3 from datetime
import timedelta
7 from omnilogic
import OmniLogic, OmniLogicException
13 from .const
import ALL_ITEM_KINDS
15 _LOGGER = logging.getLogger(__name__)
19 """Class to manage fetching update data from single endpoint."""
26 config_entry: ConfigEntry,
27 polling_interval: int,
29 """Initialize the global Omnilogic data updater."""
37 update_interval=
timedelta(seconds=polling_interval),
41 """Fetch data from OmniLogic."""
43 data = await self.
apiapi.get_telemetry_data()
45 except OmniLogicException
as error:
46 raise UpdateFailed(f
"Error updating from OmniLogic: {error}")
from error
50 def get_item_data(item, item_kind, current_id, data):
51 """Get data per kind of Omnilogic API item."""
52 if isinstance(item, list):
53 for single_item
in item:
54 data = get_item_data(single_item, item_kind, current_id, data)
56 if "systemId" in item:
57 system_id = item[
"systemId"]
58 current_id = (*current_id, item_kind, system_id)
59 data[current_id] = item
61 for kind
in ALL_ITEM_KINDS:
63 data = get_item_data(item[kind], kind, current_id, data)
67 return get_item_data(data,
"Backyard", (), parsed_data)
None __init__(self, HomeAssistant hass, OmniLogic api, str name, ConfigEntry config_entry, int polling_interval)
def _async_update_data(self)