1 """The data update coordinator for the A. O. Smith integration."""
5 from py_aosmith
import (
7 AOSmithInvalidCredentialsException,
8 AOSmithUnknownException,
10 from py_aosmith.models
import Device
as AOSmithDevice
16 from .const
import DOMAIN, ENERGY_USAGE_INTERVAL, FAST_INTERVAL, REGULAR_INTERVAL
18 _LOGGER = logging.getLogger(__name__)
22 """Coordinator for device status, updating with a frequent interval."""
24 def __init__(self, hass: HomeAssistant, client: AOSmithAPIClient) ->
None:
25 """Initialize the coordinator."""
26 super().
__init__(hass, _LOGGER, name=DOMAIN, update_interval=REGULAR_INTERVAL)
30 """Fetch latest data from the device status endpoint."""
32 devices = await self.
clientclient.get_devices()
33 except AOSmithInvalidCredentialsException
as err:
34 raise ConfigEntryAuthFailed
from err
35 except AOSmithUnknownException
as err:
36 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
38 mode_pending = any(device.status.mode_change_pending
for device
in devices)
39 setpoint_pending = any(
40 device.status.temperature_setpoint_pending
for device
in devices
43 if mode_pending
or setpoint_pending:
48 return {device.junction_id: device
for device
in devices}
52 """Coordinator for energy usage data, updating with a slower interval."""
57 client: AOSmithAPIClient,
58 junction_ids: list[str],
60 """Initialize the coordinator."""
62 hass, _LOGGER, name=DOMAIN, update_interval=ENERGY_USAGE_INTERVAL
68 """Fetch latest data from the energy usage endpoint."""
69 energy_usage_by_junction_id: dict[str, float] = {}
73 energy_usage = await self.
clientclient.get_energy_use_data(junction_id)
74 except AOSmithInvalidCredentialsException
as err:
75 raise ConfigEntryAuthFailed
from err
76 except AOSmithUnknownException
as err:
77 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
79 energy_usage_by_junction_id[junction_id] = energy_usage.lifetime_kwh
81 return energy_usage_by_junction_id
None __init__(self, HomeAssistant hass, AOSmithAPIClient client, list[str] junction_ids)
dict[str, float] _async_update_data(self)
None __init__(self, HomeAssistant hass, AOSmithAPIClient client)
dict[str, AOSmithDevice] _async_update_data(self)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)