Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Define an object to coordinate fetching Ondilo ICO data."""
2 
3 from dataclasses import dataclass
4 from datetime import timedelta
5 import logging
6 from typing import Any
7 
8 from ondilo import OndiloError
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from . import DOMAIN
14 from .api import OndiloClient
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
19 @dataclass
21  """Class for storing the data."""
22 
23  ico: dict[str, Any]
24  pool: dict[str, Any]
25  sensors: dict[str, Any]
26 
27 
28 class OndiloIcoCoordinator(DataUpdateCoordinator[dict[str, OndiloIcoData]]):
29  """Class to manage fetching Ondilo ICO data from API."""
30 
31  def __init__(self, hass: HomeAssistant, api: OndiloClient) -> None:
32  """Initialize."""
33  super().__init__(
34  hass,
35  logger=_LOGGER,
36  name=DOMAIN,
37  update_interval=timedelta(minutes=20),
38  )
39  self.apiapi = api
40 
41  async def _async_update_data(self) -> dict[str, OndiloIcoData]:
42  """Fetch data from API endpoint."""
43  try:
44  return await self.hasshass.async_add_executor_job(self._update_data_update_data)
45  except OndiloError as err:
46  raise UpdateFailed(f"Error communicating with API: {err}") from err
47 
48  def _update_data(self) -> dict[str, OndiloIcoData]:
49  """Fetch data from API endpoint."""
50  res = {}
51  pools = self.apiapi.get_pools()
52  _LOGGER.debug("Pools: %s", pools)
53  error: OndiloError | None = None
54  for pool in pools:
55  pool_id = pool["id"]
56  try:
57  ico = self.apiapi.get_ICO_details(pool_id)
58  if not ico:
59  _LOGGER.debug(
60  "The pool id %s does not have any ICO attached", pool_id
61  )
62  continue
63  sensors = self.apiapi.get_last_pool_measures(pool_id)
64  except OndiloError as err:
65  error = err
66  _LOGGER.debug("Error communicating with API for %s: %s", pool_id, err)
67  continue
68  res[pool_id] = OndiloIcoData(
69  ico=ico,
70  pool=pool,
71  sensors={sensor["data_type"]: sensor["value"] for sensor in sensors},
72  )
73  if not res:
74  if error:
75  raise UpdateFailed(f"Error communicating with API: {error}") from error
76  raise UpdateFailed("No data available")
77  return res
None __init__(self, HomeAssistant hass, OndiloClient api)
Definition: coordinator.py:31