Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Freedompro data update coordinator."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import Any
8 
9 from pyfreedompro import get_list, get_states
10 
11 from homeassistant.helpers import aiohttp_client
12 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
13 
14 from .const import DOMAIN
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
20  """Class to manage fetching Freedompro data API."""
21 
22  def __init__(self, hass, api_key):
23  """Initialize."""
24  self._hass_hass = hass
25  self._api_key_api_key = api_key
26  self._devices_devices: list[dict[str, Any]] | None = None
27 
28  update_interval = timedelta(minutes=1)
29  super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
30 
31  async def _async_update_data(self):
32  if self._devices_devices is None:
33  result = await get_list(
34  aiohttp_client.async_get_clientsession(self._hass_hass), self._api_key_api_key
35  )
36  if result["state"]:
37  self._devices_devices = result["devices"]
38  else:
39  raise UpdateFailed
40 
41  result = await get_states(
42  aiohttp_client.async_get_clientsession(self._hass_hass), self._api_key_api_key
43  )
44 
45  for device in self._devices_devices:
46  dev = next(
47  (dev for dev in result if dev["uid"] == device["uid"]),
48  None,
49  )
50  if dev is not None and "state" in dev:
51  device["state"] = dev["state"]
52  return self._devices_devices