Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator to handle Opower connections."""
2 
3 from datetime import datetime, timedelta
4 import logging
5 from types import MappingProxyType
6 from typing import Any, cast
7 
8 from opower import (
9  Account,
10  AggregateType,
11  CostRead,
12  Forecast,
13  InvalidAuth,
14  MeterType,
15  Opower,
16  ReadResolution,
17 )
18 
19 from homeassistant.components.recorder import get_instance
20 from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
22  async_add_external_statistics,
23  get_last_statistics,
24  statistics_during_period,
25 )
26 from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, UnitOfEnergy, UnitOfVolume
27 from homeassistant.core import HomeAssistant, callback
28 from homeassistant.exceptions import ConfigEntryAuthFailed
29 from homeassistant.helpers import aiohttp_client
30 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
31 from homeassistant.util import dt as dt_util
32 
33 from .const import CONF_TOTP_SECRET, CONF_UTILITY, DOMAIN
34 
35 _LOGGER = logging.getLogger(__name__)
36 
37 
38 class OpowerCoordinator(DataUpdateCoordinator[dict[str, Forecast]]):
39  """Handle fetching Opower data, updating sensors and inserting statistics."""
40 
41  def __init__(
42  self,
43  hass: HomeAssistant,
44  entry_data: MappingProxyType[str, Any],
45  ) -> None:
46  """Initialize the data handler."""
47  super().__init__(
48  hass,
49  _LOGGER,
50  name="Opower",
51  # Data is updated daily on Opower.
52  # Refresh every 12h to be at most 12h behind.
53  update_interval=timedelta(hours=12),
54  )
55  self.apiapi = Opower(
56  aiohttp_client.async_get_clientsession(hass),
57  entry_data[CONF_UTILITY],
58  entry_data[CONF_USERNAME],
59  entry_data[CONF_PASSWORD],
60  entry_data.get(CONF_TOTP_SECRET),
61  )
62 
63  @callback
64  def _dummy_listener() -> None:
65  pass
66 
67  # Force the coordinator to periodically update by registering at least one listener.
68  # Needed when the _async_update_data below returns {} for utilities that don't provide
69  # forecast, which results to no sensors added, no registered listeners, and thus
70  # _async_update_data not periodically getting called which is needed for _insert_statistics.
71  self.async_add_listener(_dummy_listener)
72 
73  async def _async_update_data(
74  self,
75  ) -> dict[str, Forecast]:
76  """Fetch data from API endpoint."""
77  try:
78  # Login expires after a few minutes.
79  # Given the infrequent updating (every 12h)
80  # assume previous session has expired and re-login.
81  await self.apiapi.async_login()
82  except InvalidAuth as err:
83  raise ConfigEntryAuthFailed from err
84  forecasts: list[Forecast] = await self.apiapi.async_get_forecast()
85  _LOGGER.debug("Updating sensor data with: %s", forecasts)
86  # Because Opower provides historical usage/cost with a delay of a couple of days
87  # we need to insert data into statistics.
88  await self._insert_statistics_insert_statistics()
89  return {forecast.account.utility_account_id: forecast for forecast in forecasts}
90 
91  async def _insert_statistics(self) -> None:
92  """Insert Opower statistics."""
93  for account in await self.apiapi.async_get_accounts():
94  id_prefix = "_".join(
95  (
96  self.apiapi.utility.subdomain(),
97  account.meter_type.name.lower(),
98  # Some utilities like AEP have "-" in their account id.
99  # Replace it with "_" to avoid "Invalid statistic_id"
100  account.utility_account_id.replace("-", "_").lower(),
101  )
102  )
103  cost_statistic_id = f"{DOMAIN}:{id_prefix}_energy_cost"
104  consumption_statistic_id = f"{DOMAIN}:{id_prefix}_energy_consumption"
105  _LOGGER.debug(
106  "Updating Statistics for %s and %s",
107  cost_statistic_id,
108  consumption_statistic_id,
109  )
110 
111  last_stat = await get_instance(self.hasshass).async_add_executor_job(
112  get_last_statistics, self.hasshass, 1, consumption_statistic_id, True, set()
113  )
114  if not last_stat:
115  _LOGGER.debug("Updating statistic for the first time")
116  cost_reads = await self._async_get_cost_reads_async_get_cost_reads(
117  account, self.apiapi.utility.timezone()
118  )
119  cost_sum = 0.0
120  consumption_sum = 0.0
121  last_stats_time = None
122  else:
123  cost_reads = await self._async_get_cost_reads_async_get_cost_reads(
124  account,
125  self.apiapi.utility.timezone(),
126  last_stat[consumption_statistic_id][0]["start"],
127  )
128  if not cost_reads:
129  _LOGGER.debug("No recent usage/cost data. Skipping update")
130  continue
131  start = cost_reads[0].start_time
132  _LOGGER.debug("Getting statistics at: %s", start)
133  # In the common case there should be a previous statistic at start time
134  # so we only need to fetch one statistic. If there isn't any, fetch all.
135  for end in (start + timedelta(seconds=1), None):
136  stats = await get_instance(self.hasshass).async_add_executor_job(
137  statistics_during_period,
138  self.hasshass,
139  start,
140  end,
141  {cost_statistic_id, consumption_statistic_id},
142  "hour",
143  None,
144  {"sum"},
145  )
146  if stats:
147  break
148  if end:
149  _LOGGER.debug(
150  "Not found. Trying to find the oldest statistic after %s",
151  start,
152  )
153  # We are in this code path only if get_last_statistics found a stat
154  # so statistics_during_period should also have found at least one.
155  assert stats
156  cost_sum = cast(float, stats[cost_statistic_id][0]["sum"])
157  consumption_sum = cast(float, stats[consumption_statistic_id][0]["sum"])
158  last_stats_time = stats[consumption_statistic_id][0]["start"]
159 
160  cost_statistics = []
161  consumption_statistics = []
162 
163  for cost_read in cost_reads:
164  start = cost_read.start_time
165  if last_stats_time is not None and start.timestamp() <= last_stats_time:
166  continue
167  cost_sum += cost_read.provided_cost
168  consumption_sum += cost_read.consumption
169 
170  cost_statistics.append(
171  StatisticData(
172  start=start, state=cost_read.provided_cost, sum=cost_sum
173  )
174  )
175  consumption_statistics.append(
176  StatisticData(
177  start=start, state=cost_read.consumption, sum=consumption_sum
178  )
179  )
180 
181  name_prefix = (
182  f"Opower {self.api.utility.subdomain()} "
183  f"{account.meter_type.name.lower()} {account.utility_account_id}"
184  )
185  cost_metadata = StatisticMetaData(
186  has_mean=False,
187  has_sum=True,
188  name=f"{name_prefix} cost",
189  source=DOMAIN,
190  statistic_id=cost_statistic_id,
191  unit_of_measurement=None,
192  )
193  consumption_metadata = StatisticMetaData(
194  has_mean=False,
195  has_sum=True,
196  name=f"{name_prefix} consumption",
197  source=DOMAIN,
198  statistic_id=consumption_statistic_id,
199  unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR
200  if account.meter_type == MeterType.ELEC
201  else UnitOfVolume.CENTUM_CUBIC_FEET,
202  )
203 
204  _LOGGER.debug(
205  "Adding %s statistics for %s",
206  len(cost_statistics),
207  cost_statistic_id,
208  )
209  async_add_external_statistics(self.hasshass, cost_metadata, cost_statistics)
210  _LOGGER.debug(
211  "Adding %s statistics for %s",
212  len(consumption_statistics),
213  consumption_statistic_id,
214  )
216  self.hasshass, consumption_metadata, consumption_statistics
217  )
218 
220  self, account: Account, time_zone_str: str, start_time: float | None = None
221  ) -> list[CostRead]:
222  """Get cost reads.
223 
224  If start_time is None, get cost reads since account activation,
225  otherwise since start_time - 30 days to allow corrections in data from utilities
226 
227  We read at different resolutions depending on age:
228  - month resolution for all years (since account activation)
229  - day resolution for past 3 years (if account's read resolution supports it)
230  - hour resolution for past 2 months (if account's read resolution supports it)
231  """
232 
233  def _update_with_finer_cost_reads(
234  cost_reads: list[CostRead], finer_cost_reads: list[CostRead]
235  ) -> None:
236  for i, cost_read in enumerate(cost_reads):
237  for j, finer_cost_read in enumerate(finer_cost_reads):
238  if cost_read.start_time == finer_cost_read.start_time:
239  cost_reads[i:] = finer_cost_reads[j:]
240  return
241  if cost_read.end_time == finer_cost_read.start_time:
242  cost_reads[i + 1 :] = finer_cost_reads[j:]
243  return
244  if cost_read.end_time < finer_cost_read.start_time:
245  break
246  cost_reads += finer_cost_reads
247 
248  tz = await dt_util.async_get_time_zone(time_zone_str)
249  if start_time is None:
250  start = None
251  else:
252  start = datetime.fromtimestamp(start_time, tz=tz) - timedelta(days=30)
253  end = dt_util.now(tz)
254  _LOGGER.debug("Getting monthly cost reads: %s - %s", start, end)
255  cost_reads = await self.apiapi.async_get_cost_reads(
256  account, AggregateType.BILL, start, end
257  )
258  _LOGGER.debug("Got %s monthly cost reads", len(cost_reads))
259  if account.read_resolution == ReadResolution.BILLING:
260  return cost_reads
261 
262  if start_time is None:
263  start = end - timedelta(days=3 * 365)
264  else:
265  if cost_reads:
266  start = cost_reads[0].start_time
267  assert start
268  start = max(start, end - timedelta(days=3 * 365))
269  _LOGGER.debug("Getting daily cost reads: %s - %s", start, end)
270  daily_cost_reads = await self.apiapi.async_get_cost_reads(
271  account, AggregateType.DAY, start, end
272  )
273  _LOGGER.debug("Got %s daily cost reads", len(daily_cost_reads))
274  _update_with_finer_cost_reads(cost_reads, daily_cost_reads)
275  if account.read_resolution == ReadResolution.DAY:
276  return cost_reads
277 
278  if start_time is None:
279  start = end - timedelta(days=2 * 30)
280  else:
281  assert start
282  start = max(start, end - timedelta(days=2 * 30))
283  _LOGGER.debug("Getting hourly cost reads: %s - %s", start, end)
284  hourly_cost_reads = await self.apiapi.async_get_cost_reads(
285  account, AggregateType.HOUR, start, end
286  )
287  _LOGGER.debug("Got %s hourly cost reads", len(hourly_cost_reads))
288  _update_with_finer_cost_reads(cost_reads, hourly_cost_reads)
289  _LOGGER.debug("Got %s cost reads", len(cost_reads))
290  return cost_reads
list[CostRead] _async_get_cost_reads(self, Account account, str time_zone_str, float|None start_time=None)
Definition: coordinator.py:221
None __init__(self, HomeAssistant hass, MappingProxyType[str, Any] entry_data)
Definition: coordinator.py:45
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74