Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator to handle Duke Energy connections."""
2 
3 from datetime import datetime, timedelta
4 import logging
5 from types import MappingProxyType
6 from typing import Any, cast
7 
8 from aiodukeenergy import DukeEnergy
9 from aiohttp import ClientError
10 
11 from homeassistant.components.recorder import get_instance
12 from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
14  async_add_external_statistics,
15  get_last_statistics,
16  statistics_during_period,
17 )
18 from homeassistant.config_entries import ConfigEntry
19 from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, UnitOfEnergy, UnitOfVolume
20 from homeassistant.core import HomeAssistant, callback
21 from homeassistant.helpers.aiohttp_client import async_get_clientsession
22 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
23 from homeassistant.util import dt as dt_util
24 
25 from .const import DOMAIN
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 _SUPPORTED_METER_TYPES = ("ELECTRIC",)
30 
31 type DukeEnergyConfigEntry = ConfigEntry[DukeEnergyCoordinator]
32 
33 
35  """Handle inserting statistics."""
36 
37  config_entry: DukeEnergyConfigEntry
38 
39  def __init__(
40  self,
41  hass: HomeAssistant,
42  entry_data: MappingProxyType[str, Any],
43  ) -> None:
44  """Initialize the data handler."""
45  super().__init__(
46  hass,
47  _LOGGER,
48  name="Duke Energy",
49  # Data is updated daily on Duke Energy.
50  # Refresh every 12h to be at most 12h behind.
51  update_interval=timedelta(hours=12),
52  )
53  self.apiapi = DukeEnergy(
54  entry_data[CONF_USERNAME],
55  entry_data[CONF_PASSWORD],
57  )
58  self._statistic_ids: set = set()
59 
60  @callback
61  def _dummy_listener() -> None:
62  pass
63 
64  # Force the coordinator to periodically update by registering at least one listener.
65  # Duke Energy does not provide forecast data, so all information is historical.
66  # This makes _async_update_data get periodically called so we can insert statistics.
67  self.async_add_listener(_dummy_listener)
68 
69  self.config_entry.async_on_unload(self._clear_statistics)
70 
71  def _clear_statistics(self) -> None:
72  """Clear statistics."""
73  get_instance(self.hasshass).async_clear_statistics(list(self._statistic_ids))
74 
75  async def _async_update_data(self) -> None:
76  """Insert Duke Energy statistics."""
77  meters: dict[str, dict[str, Any]] = await self.apiapi.get_meters()
78  for serial_number, meter in meters.items():
79  if (
80  not isinstance(meter["serviceType"], str)
81  or meter["serviceType"] not in _SUPPORTED_METER_TYPES
82  ):
83  _LOGGER.debug(
84  "Skipping unsupported meter type %s", meter["serviceType"]
85  )
86  continue
87 
88  id_prefix = f"{meter["serviceType"].lower()}_{serial_number}"
89  consumption_statistic_id = f"{DOMAIN}:{id_prefix}_energy_consumption"
90  self._statistic_ids.add(consumption_statistic_id)
91  _LOGGER.debug(
92  "Updating Statistics for %s",
93  consumption_statistic_id,
94  )
95 
96  last_stat = await get_instance(self.hasshass).async_add_executor_job(
97  get_last_statistics, self.hasshass, 1, consumption_statistic_id, True, set()
98  )
99  if not last_stat:
100  _LOGGER.debug("Updating statistic for the first time")
101  usage = await self._async_get_energy_usage_async_get_energy_usage(meter)
102  consumption_sum = 0.0
103  last_stats_time = None
104  else:
105  usage = await self._async_get_energy_usage_async_get_energy_usage(
106  meter,
107  last_stat[consumption_statistic_id][0]["start"],
108  )
109  if not usage:
110  _LOGGER.debug("No recent usage data. Skipping update")
111  continue
112  stats = await get_instance(self.hasshass).async_add_executor_job(
113  statistics_during_period,
114  self.hasshass,
115  min(usage.keys()),
116  None,
117  {consumption_statistic_id},
118  "hour",
119  None,
120  {"sum"},
121  )
122  consumption_sum = cast(float, stats[consumption_statistic_id][0]["sum"])
123  last_stats_time = stats[consumption_statistic_id][0]["start"]
124 
125  consumption_statistics = []
126 
127  for start, data in usage.items():
128  if last_stats_time is not None and start.timestamp() <= last_stats_time:
129  continue
130  consumption_sum += data["energy"]
131 
132  consumption_statistics.append(
133  StatisticData(
134  start=start, state=data["energy"], sum=consumption_sum
135  )
136  )
137 
138  name_prefix = (
139  f"Duke Energy " f"{meter["serviceType"].capitalize()} {serial_number}"
140  )
141  consumption_metadata = StatisticMetaData(
142  has_mean=False,
143  has_sum=True,
144  name=f"{name_prefix} Consumption",
145  source=DOMAIN,
146  statistic_id=consumption_statistic_id,
147  unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR
148  if meter["serviceType"] == "ELECTRIC"
149  else UnitOfVolume.CENTUM_CUBIC_FEET,
150  )
151 
152  _LOGGER.debug(
153  "Adding %s statistics for %s",
154  len(consumption_statistics),
155  consumption_statistic_id,
156  )
158  self.hasshass, consumption_metadata, consumption_statistics
159  )
160 
162  self, meter: dict[str, Any], start_time: float | None = None
163  ) -> dict[datetime, dict[str, float | int]]:
164  """Get energy usage.
165 
166  If start_time is None, get usage since account activation (or as far back as possible),
167  otherwise since start_time - 30 days to allow corrections in data.
168 
169  Duke Energy provides hourly data all the way back to ~3 years.
170  """
171 
172  # All of Duke Energy Service Areas are currently in America/New_York timezone
173  # May need to re-think this if that ever changes and determine timezone based
174  # on the service address somehow.
175  tz = await dt_util.async_get_time_zone("America/New_York")
176  lookback = timedelta(days=30)
177  one = timedelta(days=1)
178  if start_time is None:
179  # Max 3 years of data
180  agreement_date = dt_util.parse_datetime(meter["agreementActiveDate"])
181  if agreement_date is None:
182  start = dt_util.now(tz) - timedelta(days=3 * 365)
183  else:
184  start = max(
185  agreement_date.replace(tzinfo=tz),
186  dt_util.now(tz) - timedelta(days=3 * 365),
187  )
188  else:
189  start = datetime.fromtimestamp(start_time, tz=tz) - lookback
190 
191  start = start.replace(hour=0, minute=0, second=0, microsecond=0)
192  end = dt_util.now(tz).replace(hour=0, minute=0, second=0, microsecond=0) - one
193  _LOGGER.debug("Data lookup range: %s - %s", start, end)
194 
195  start_step = end - lookback
196  end_step = end
197  usage: dict[datetime, dict[str, float | int]] = {}
198  while True:
199  _LOGGER.debug("Getting hourly usage: %s - %s", start_step, end_step)
200  try:
201  # Get data
202  results = await self.apiapi.get_energy_usage(
203  meter["serialNum"], "HOURLY", "DAY", start_step, end_step
204  )
205  usage = {**results["data"], **usage}
206 
207  for missing in results["missing"]:
208  _LOGGER.debug("Missing data: %s", missing)
209 
210  # Set next range
211  end_step = start_step - one
212  start_step = max(start_step - lookback, start)
213 
214  # Make sure we don't go back too far
215  if end_step < start:
216  break
217  except (TimeoutError, ClientError):
218  # ClientError is raised when there is no more data for the range
219  break
220 
221  _LOGGER.debug("Got %s meter usage reads", len(usage))
222  return usage
None __init__(self, HomeAssistant hass, MappingProxyType[str, Any] entry_data)
Definition: coordinator.py:43
dict[datetime, dict[str, float|int]] _async_get_energy_usage(self, dict[str, Any] meter, float|None start_time=None)
Definition: coordinator.py:163
bool add(self, _T matcher)
Definition: match.py:185
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74