1 """DataUpdateCoordinator for the srp_energy integration."""
3 from __future__
import annotations
6 from datetime
import timedelta
8 from srpenergy.client
import SrpEnergyClient
15 from .const
import DOMAIN, LOGGER, MIN_TIME_BETWEEN_UPDATES, PHOENIX_TIME_ZONE
18 PHOENIX_ZONE_INFO = dt_util.get_time_zone(PHOENIX_TIME_ZONE)
22 """A srp_energy Data Update Coordinator."""
24 config_entry: ConfigEntry
27 self, hass: HomeAssistant, client: SrpEnergyClient, is_time_of_use: bool
29 """Initialize the srp_energy data coordinator."""
36 update_interval=MIN_TIME_BETWEEN_UPDATES,
40 """Fetch data from API endpoint.
42 This is the place to pre-process the data to lookup tables
43 so entities can quickly look up their data.
45 LOGGER.debug(
"async_update_data enter")
47 end_date = dt_util.now(PHOENIX_ZONE_INFO)
50 async
with asyncio.timeout(TIMEOUT):
51 hourly_usage = await self.
hasshass.async_add_executor_job(
57 except (ValueError, TypeError)
as err:
58 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
61 "async_update_data: Received %s record(s) from %s to %s",
62 len(hourly_usage)
if hourly_usage
else "None",
67 previous_daily_usage = 0.0
68 for _, _, _, kwh, _
in hourly_usage:
69 previous_daily_usage +=
float(kwh)
72 "async_update_data: previous_daily_usage %s",
76 return previous_daily_usage
float _async_update_data(self)
None __init__(self, HomeAssistant hass, SrpEnergyClient client, bool is_time_of_use)