Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for Streamlabs water integration."""
2 
3 from dataclasses import dataclass
4 from datetime import timedelta
5 
6 from streamlabswater.streamlabswater import StreamlabsClient
7 
8 from homeassistant.core import HomeAssistant
9 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
10 
11 from .const import LOGGER
12 
13 
14 @dataclass(slots=True)
16  """Class to hold Streamlabs data."""
17 
18  is_away: bool
19  name: str
20  daily_usage: float
21  monthly_usage: float
22  yearly_usage: float
23 
24 
25 class StreamlabsCoordinator(DataUpdateCoordinator[dict[str, StreamlabsData]]):
26  """Coordinator for Streamlabs."""
27 
28  def __init__(
29  self,
30  hass: HomeAssistant,
31  client: StreamlabsClient,
32  ) -> None:
33  """Coordinator for Streamlabs."""
34  super().__init__(
35  hass,
36  LOGGER,
37  name="Streamlabs",
38  update_interval=timedelta(seconds=60),
39  )
40  self.clientclient = client
41 
42  async def _async_update_data(self) -> dict[str, StreamlabsData]:
43  return await self.hasshass.async_add_executor_job(self._update_data_update_data)
44 
45  def _update_data(self) -> dict[str, StreamlabsData]:
46  locations = self.clientclient.get_locations()
47  res = {}
48  for location in locations["locations"]:
49  location_id = location["locationId"]
50  water_usage = self.clientclient.get_water_usage_summary(location_id)
51  res[location_id] = StreamlabsData(
52  is_away=location["homeAway"] == "away",
53  name=location["name"],
54  daily_usage=water_usage["today"],
55  monthly_usage=water_usage["thisMonth"],
56  yearly_usage=water_usage["thisYear"],
57  )
58  return res
None __init__(self, HomeAssistant hass, StreamlabsClient client)
Definition: coordinator.py:32