1 """Coordinator to handle Duke Energy connections."""
3 from datetime
import datetime, timedelta
5 from types
import MappingProxyType
6 from typing
import Any, cast
8 from aiodukeenergy
import DukeEnergy
9 from aiohttp
import ClientError
14 async_add_external_statistics,
16 statistics_during_period,
25 from .const
import DOMAIN
27 _LOGGER = logging.getLogger(__name__)
29 _SUPPORTED_METER_TYPES = (
"ELECTRIC",)
31 type DukeEnergyConfigEntry = ConfigEntry[DukeEnergyCoordinator]
35 """Handle inserting statistics."""
37 config_entry: DukeEnergyConfigEntry
42 entry_data: MappingProxyType[str, Any],
44 """Initialize the data handler."""
53 self.
apiapi = DukeEnergy(
54 entry_data[CONF_USERNAME],
55 entry_data[CONF_PASSWORD],
58 self._statistic_ids: set = set()
61 def _dummy_listener() -> None:
67 self.async_add_listener(_dummy_listener)
69 self.config_entry.async_on_unload(self._clear_statistics)
72 """Clear statistics."""
76 """Insert Duke Energy statistics."""
77 meters: dict[str, dict[str, Any]] = await self.
apiapi.get_meters()
78 for serial_number, meter
in meters.items():
80 not isinstance(meter[
"serviceType"], str)
81 or meter[
"serviceType"]
not in _SUPPORTED_METER_TYPES
84 "Skipping unsupported meter type %s", meter[
"serviceType"]
88 id_prefix = f
"{meter["serviceType
"].lower()}_{serial_number}"
89 consumption_statistic_id = f
"{DOMAIN}:{id_prefix}_energy_consumption"
90 self._statistic_ids.
add(consumption_statistic_id)
92 "Updating Statistics for %s",
93 consumption_statistic_id,
97 get_last_statistics, self.
hasshass, 1, consumption_statistic_id,
True, set()
100 _LOGGER.debug(
"Updating statistic for the first time")
102 consumption_sum = 0.0
103 last_stats_time =
None
107 last_stat[consumption_statistic_id][0][
"start"],
110 _LOGGER.debug(
"No recent usage data. Skipping update")
113 statistics_during_period,
117 {consumption_statistic_id},
122 consumption_sum = cast(float, stats[consumption_statistic_id][0][
"sum"])
123 last_stats_time = stats[consumption_statistic_id][0][
"start"]
125 consumption_statistics = []
127 for start, data
in usage.items():
128 if last_stats_time
is not None and start.timestamp() <= last_stats_time:
130 consumption_sum += data[
"energy"]
132 consumption_statistics.append(
134 start=start, state=data[
"energy"], sum=consumption_sum
139 f
"Duke Energy " f
"{meter["serviceType
"].capitalize()} {serial_number}"
141 consumption_metadata = StatisticMetaData(
144 name=f
"{name_prefix} Consumption",
146 statistic_id=consumption_statistic_id,
147 unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR
148 if meter[
"serviceType"] ==
"ELECTRIC"
149 else UnitOfVolume.CENTUM_CUBIC_FEET,
153 "Adding %s statistics for %s",
154 len(consumption_statistics),
155 consumption_statistic_id,
158 self.
hasshass, consumption_metadata, consumption_statistics
162 self, meter: dict[str, Any], start_time: float |
None =
None
163 ) -> dict[datetime, dict[str, float | int]]:
166 If start_time is None, get usage since account activation (or as far back as possible),
167 otherwise since start_time - 30 days to allow corrections in data.
169 Duke Energy provides hourly data all the way back to ~3 years.
175 tz = await dt_util.async_get_time_zone(
"America/New_York")
178 if start_time
is None:
180 agreement_date = dt_util.parse_datetime(meter[
"agreementActiveDate"])
181 if agreement_date
is None:
182 start = dt_util.now(tz) -
timedelta(days=3 * 365)
185 agreement_date.replace(tzinfo=tz),
186 dt_util.now(tz) -
timedelta(days=3 * 365),
189 start = datetime.fromtimestamp(start_time, tz=tz) - lookback
191 start = start.replace(hour=0, minute=0, second=0, microsecond=0)
192 end = dt_util.now(tz).replace(hour=0, minute=0, second=0, microsecond=0) - one
193 _LOGGER.debug(
"Data lookup range: %s - %s", start, end)
195 start_step = end - lookback
197 usage: dict[datetime, dict[str, float | int]] = {}
199 _LOGGER.debug(
"Getting hourly usage: %s - %s", start_step, end_step)
202 results = await self.
apiapi.get_energy_usage(
203 meter[
"serialNum"],
"HOURLY",
"DAY", start_step, end_step
205 usage = {**results[
"data"], **usage}
207 for missing
in results[
"missing"]:
208 _LOGGER.debug(
"Missing data: %s", missing)
211 end_step = start_step - one
212 start_step =
max(start_step - lookback, start)
217 except (TimeoutError, ClientError):
221 _LOGGER.debug(
"Got %s meter usage reads", len(usage))
None _async_update_data(self)
None __init__(self, HomeAssistant hass, MappingProxyType[str, Any] entry_data)
None _clear_statistics(self)
dict[datetime, dict[str, float|int]] _async_get_energy_usage(self, dict[str, Any] meter, float|None start_time=None)
bool add(self, _T matcher)
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
Recorder get_instance(HomeAssistant hass)