1 """Coordinator for Streamlabs water integration."""
3 from dataclasses
import dataclass
4 from datetime
import timedelta
6 from streamlabswater.streamlabswater
import StreamlabsClient
11 from .const
import LOGGER
14 @dataclass(slots=True)
16 """Class to hold Streamlabs data."""
26 """Coordinator for Streamlabs."""
31 client: StreamlabsClient,
33 """Coordinator for Streamlabs."""
43 return await self.
hasshass.async_add_executor_job(self.
_update_data_update_data)
46 locations = self.
clientclient.get_locations()
48 for location
in locations[
"locations"]:
49 location_id = location[
"locationId"]
50 water_usage = self.
clientclient.get_water_usage_summary(location_id)
52 is_away=location[
"homeAway"] ==
"away",
53 name=location[
"name"],
54 daily_usage=water_usage[
"today"],
55 monthly_usage=water_usage[
"thisMonth"],
56 yearly_usage=water_usage[
"thisYear"],
None __init__(self, HomeAssistant hass, StreamlabsClient client)
dict[str, StreamlabsData] _update_data(self)
dict[str, StreamlabsData] _async_update_data(self)