Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Water quality coordinator for Tami4Edge."""
2 
3 from dataclasses import dataclass
4 from datetime import date, timedelta
5 import logging
6 
7 from Tami4EdgeAPI import Tami4EdgeAPI, exceptions
8 from Tami4EdgeAPI.water_quality import WaterQuality
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 _LOGGER = logging.getLogger(__name__)
14 
15 
16 @dataclass
18  """Flattened WaterQuality dataclass."""
19 
20  uv_upcoming_replacement: date
21  uv_installed: bool
22  filter_upcoming_replacement: date
23  filter_installed: bool
24  filter_litters_passed: float
25 
26  def __init__(self, water_quality: WaterQuality) -> None:
27  """Flattened WaterQuality dataclass."""
28 
29  self.uv_upcoming_replacementuv_upcoming_replacement = water_quality.uv.upcoming_replacement
30  self.uv_installeduv_installed = water_quality.uv.installed
31  self.filter_upcoming_replacementfilter_upcoming_replacement = water_quality.filter.upcoming_replacement
32  self.filter_installedfilter_installed = water_quality.filter.installed
33  self.filter_litters_passedfilter_litters_passed = water_quality.filter.milli_litters_passed / 1000
34 
35 
36 class Tami4EdgeCoordinator(DataUpdateCoordinator[FlattenedWaterQuality]):
37  """Tami4Edge water quality coordinator."""
38 
39  def __init__(self, hass: HomeAssistant, api: Tami4EdgeAPI) -> None:
40  """Initialize the water quality coordinator."""
41  super().__init__(
42  hass,
43  _LOGGER,
44  name="Tami4Edge water quality coordinator",
45  update_interval=timedelta(minutes=60),
46  )
47  self._api_api = api
48 
49  async def _async_update_data(self) -> FlattenedWaterQuality:
50  """Fetch data from the API endpoint."""
51  try:
52  device = await self.hasshass.async_add_executor_job(self._api_api.get_device)
53 
54  return FlattenedWaterQuality(device.water_quality)
55  except exceptions.APIRequestFailedException as ex:
56  raise UpdateFailed("Error communicating with API") from ex
None __init__(self, HomeAssistant hass, Tami4EdgeAPI api)
Definition: coordinator.py:39