1 """Coordinator to handle Opower connections."""
3 from datetime
import datetime, timedelta
5 from types
import MappingProxyType
6 from typing
import Any, cast
22 async_add_external_statistics,
24 statistics_during_period,
33 from .const
import CONF_TOTP_SECRET, CONF_UTILITY, DOMAIN
35 _LOGGER = logging.getLogger(__name__)
39 """Handle fetching Opower data, updating sensors and inserting statistics."""
44 entry_data: MappingProxyType[str, Any],
46 """Initialize the data handler."""
56 aiohttp_client.async_get_clientsession(hass),
57 entry_data[CONF_UTILITY],
58 entry_data[CONF_USERNAME],
59 entry_data[CONF_PASSWORD],
60 entry_data.get(CONF_TOTP_SECRET),
64 def _dummy_listener() -> None:
71 self.async_add_listener(_dummy_listener)
75 ) -> dict[str, Forecast]:
76 """Fetch data from API endpoint."""
81 await self.
apiapi.async_login()
82 except InvalidAuth
as err:
83 raise ConfigEntryAuthFailed
from err
84 forecasts: list[Forecast] = await self.
apiapi.async_get_forecast()
85 _LOGGER.debug(
"Updating sensor data with: %s", forecasts)
89 return {forecast.account.utility_account_id: forecast
for forecast
in forecasts}
92 """Insert Opower statistics."""
93 for account
in await self.
apiapi.async_get_accounts():
96 self.
apiapi.utility.subdomain(),
97 account.meter_type.name.lower(),
100 account.utility_account_id.replace(
"-",
"_").lower(),
103 cost_statistic_id = f
"{DOMAIN}:{id_prefix}_energy_cost"
104 consumption_statistic_id = f
"{DOMAIN}:{id_prefix}_energy_consumption"
106 "Updating Statistics for %s and %s",
108 consumption_statistic_id,
112 get_last_statistics, self.
hasshass, 1, consumption_statistic_id,
True, set()
115 _LOGGER.debug(
"Updating statistic for the first time")
117 account, self.
apiapi.utility.timezone()
120 consumption_sum = 0.0
121 last_stats_time =
None
125 self.
apiapi.utility.timezone(),
126 last_stat[consumption_statistic_id][0][
"start"],
129 _LOGGER.debug(
"No recent usage/cost data. Skipping update")
131 start = cost_reads[0].start_time
132 _LOGGER.debug(
"Getting statistics at: %s", start)
135 for end
in (start +
timedelta(seconds=1),
None):
137 statistics_during_period,
141 {cost_statistic_id, consumption_statistic_id},
150 "Not found. Trying to find the oldest statistic after %s",
156 cost_sum = cast(float, stats[cost_statistic_id][0][
"sum"])
157 consumption_sum = cast(float, stats[consumption_statistic_id][0][
"sum"])
158 last_stats_time = stats[consumption_statistic_id][0][
"start"]
161 consumption_statistics = []
163 for cost_read
in cost_reads:
164 start = cost_read.start_time
165 if last_stats_time
is not None and start.timestamp() <= last_stats_time:
167 cost_sum += cost_read.provided_cost
168 consumption_sum += cost_read.consumption
170 cost_statistics.append(
172 start=start, state=cost_read.provided_cost, sum=cost_sum
175 consumption_statistics.append(
177 start=start, state=cost_read.consumption, sum=consumption_sum
182 f
"Opower {self.api.utility.subdomain()} "
183 f
"{account.meter_type.name.lower()} {account.utility_account_id}"
185 cost_metadata = StatisticMetaData(
188 name=f
"{name_prefix} cost",
190 statistic_id=cost_statistic_id,
191 unit_of_measurement=
None,
193 consumption_metadata = StatisticMetaData(
196 name=f
"{name_prefix} consumption",
198 statistic_id=consumption_statistic_id,
199 unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR
200 if account.meter_type == MeterType.ELEC
201 else UnitOfVolume.CENTUM_CUBIC_FEET,
205 "Adding %s statistics for %s",
206 len(cost_statistics),
211 "Adding %s statistics for %s",
212 len(consumption_statistics),
213 consumption_statistic_id,
216 self.
hasshass, consumption_metadata, consumption_statistics
220 self, account: Account, time_zone_str: str, start_time: float |
None =
None
224 If start_time is None, get cost reads since account activation,
225 otherwise since start_time - 30 days to allow corrections in data from utilities
227 We read at different resolutions depending on age:
228 - month resolution for all years (since account activation)
229 - day resolution for past 3 years (if account's read resolution supports it)
230 - hour resolution for past 2 months (if account's read resolution supports it)
233 def _update_with_finer_cost_reads(
234 cost_reads: list[CostRead], finer_cost_reads: list[CostRead]
236 for i, cost_read
in enumerate(cost_reads):
237 for j, finer_cost_read
in enumerate(finer_cost_reads):
238 if cost_read.start_time == finer_cost_read.start_time:
239 cost_reads[i:] = finer_cost_reads[j:]
241 if cost_read.end_time == finer_cost_read.start_time:
242 cost_reads[i + 1 :] = finer_cost_reads[j:]
244 if cost_read.end_time < finer_cost_read.start_time:
246 cost_reads += finer_cost_reads
248 tz = await dt_util.async_get_time_zone(time_zone_str)
249 if start_time
is None:
252 start = datetime.fromtimestamp(start_time, tz=tz) -
timedelta(days=30)
253 end = dt_util.now(tz)
254 _LOGGER.debug(
"Getting monthly cost reads: %s - %s", start, end)
255 cost_reads = await self.
apiapi.async_get_cost_reads(
256 account, AggregateType.BILL, start, end
258 _LOGGER.debug(
"Got %s monthly cost reads", len(cost_reads))
259 if account.read_resolution == ReadResolution.BILLING:
262 if start_time
is None:
266 start = cost_reads[0].start_time
269 _LOGGER.debug(
"Getting daily cost reads: %s - %s", start, end)
270 daily_cost_reads = await self.
apiapi.async_get_cost_reads(
271 account, AggregateType.DAY, start, end
273 _LOGGER.debug(
"Got %s daily cost reads", len(daily_cost_reads))
274 _update_with_finer_cost_reads(cost_reads, daily_cost_reads)
275 if account.read_resolution == ReadResolution.DAY:
278 if start_time
is None:
283 _LOGGER.debug(
"Getting hourly cost reads: %s - %s", start, end)
284 hourly_cost_reads = await self.
apiapi.async_get_cost_reads(
285 account, AggregateType.HOUR, start, end
287 _LOGGER.debug(
"Got %s hourly cost reads", len(hourly_cost_reads))
288 _update_with_finer_cost_reads(cost_reads, hourly_cost_reads)
289 _LOGGER.debug(
"Got %s cost reads", len(cost_reads))
list[CostRead] _async_get_cost_reads(self, Account account, str time_zone_str, float|None start_time=None)
None __init__(self, HomeAssistant hass, MappingProxyType[str, Any] entry_data)
None _insert_statistics(self)
dict[str, Forecast] _async_update_data(self)
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Recorder get_instance(HomeAssistant hass)