Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for flipr integration."""
2 
3 from datetime import timedelta
4 import logging
5 from typing import Any
6 
7 from flipr_api import FliprAPIRestClient
8 from flipr_api.exceptions import FliprError
9 
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.core import HomeAssistant
12 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
13 
14 _LOGGER = logging.getLogger(__name__)
15 
16 
17 class BaseDataUpdateCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
18  """Parent class to hold Flipr and Hub data retrieval."""
19 
20  config_entry: ConfigEntry
21 
22  def __init__(
23  self, hass: HomeAssistant, client: FliprAPIRestClient, flipr_or_hub_id: str
24  ) -> None:
25  """Initialize."""
26  self.device_iddevice_id = flipr_or_hub_id
27  self.clientclient = client
28 
29  super().__init__(
30  hass,
31  _LOGGER,
32  name=f"Flipr or Hub data measure for {self.device_id}",
33  update_interval=timedelta(minutes=15),
34  )
35 
36 
38  """Class to hold Flipr data retrieval."""
39 
40  async def _async_update_data(self) -> dict[str, Any]:
41  """Fetch data from API endpoint."""
42  try:
43  data = await self.hasshass.async_add_executor_job(
44  self.clientclient.get_pool_measure_latest, self.device_iddevice_id
45  )
46  except FliprError as error:
47  raise UpdateFailed(error) from error
48 
49  return data
50 
51 
53  """Class to hold Flipr hub data retrieval."""
54 
55  async def _async_update_data(self) -> dict[str, Any]:
56  """Fetch data from API endpoint."""
57  try:
58  data = await self.hasshass.async_add_executor_job(
59  self.clientclient.get_hub_state, self.device_iddevice_id
60  )
61  except FliprError as error:
62  raise UpdateFailed(error) from error
63 
64  return data
None __init__(self, HomeAssistant hass, FliprAPIRestClient client, str flipr_or_hub_id)
Definition: coordinator.py:24