Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for Tibber sensors."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import cast
8 
9 import tibber
10 
11 from homeassistant.components.recorder import get_instance
12 from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
14  async_add_external_statistics,
15  get_last_statistics,
16  statistics_during_period,
17 )
18 from homeassistant.config_entries import ConfigEntry
19 from homeassistant.const import UnitOfEnergy
20 from homeassistant.core import HomeAssistant
21 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
22 from homeassistant.util import dt as dt_util
23 
24 from .const import DOMAIN as TIBBER_DOMAIN
25 
26 FIVE_YEARS = 5 * 365 * 24
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 
32  """Handle Tibber data and insert statistics."""
33 
34  config_entry: ConfigEntry
35 
36  def __init__(self, hass: HomeAssistant, tibber_connection: tibber.Tibber) -> None:
37  """Initialize the data handler."""
38  super().__init__(
39  hass,
40  _LOGGER,
41  name=f"Tibber {tibber_connection.name}",
42  update_interval=timedelta(minutes=20),
43  )
44  self._tibber_connection_tibber_connection = tibber_connection
45 
46  async def _async_update_data(self) -> None:
47  """Update data via API."""
48  try:
49  await self._tibber_connection_tibber_connection.fetch_consumption_data_active_homes()
50  await self._tibber_connection_tibber_connection.fetch_production_data_active_homes()
51  await self._insert_statistics_insert_statistics()
52  except tibber.RetryableHttpExceptionError as err:
53  raise UpdateFailed(f"Error communicating with API ({err.status})") from err
54  except tibber.FatalHttpExceptionError:
55  # Fatal error. Reload config entry to show correct error.
56  self.hasshass.async_create_task(
57  self.hasshass.config_entries.async_reload(self.config_entryconfig_entry.entry_id)
58  )
59 
60  async def _insert_statistics(self) -> None:
61  """Insert Tibber statistics."""
62  for home in self._tibber_connection_tibber_connection.get_homes():
63  sensors: list[tuple[str, bool, str]] = []
64  if home.hourly_consumption_data:
65  sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR))
66  sensors.append(("totalCost", False, home.currency))
67  if home.hourly_production_data:
68  sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR))
69  sensors.append(("profit", True, home.currency))
70 
71  for sensor_type, is_production, unit in sensors:
72  statistic_id = (
73  f"{TIBBER_DOMAIN}:energy_"
74  f"{sensor_type.lower()}_"
75  f"{home.home_id.replace('-', '')}"
76  )
77 
78  last_stats = await get_instance(self.hasshass).async_add_executor_job(
79  get_last_statistics, self.hasshass, 1, statistic_id, True, set()
80  )
81 
82  if not last_stats:
83  # First time we insert 5 years of data (if available)
84  hourly_data = await home.get_historic_data(
85  5 * 365 * 24, production=is_production
86  )
87 
88  _sum = 0.0
89  last_stats_time = None
90  else:
91  # hourly_consumption/production_data contains the last 30 days
92  # of consumption/production data.
93  # We update the statistics with the last 30 days
94  # of data to handle corrections in the data.
95  hourly_data = (
96  home.hourly_production_data
97  if is_production
98  else home.hourly_consumption_data
99  )
100 
101  from_time = dt_util.parse_datetime(hourly_data[0]["from"])
102  if from_time is None:
103  continue
104  start = from_time - timedelta(hours=1)
105  stat = await get_instance(self.hasshass).async_add_executor_job(
106  statistics_during_period,
107  self.hasshass,
108  start,
109  None,
110  {statistic_id},
111  "hour",
112  None,
113  {"sum"},
114  )
115  if statistic_id in stat:
116  first_stat = stat[statistic_id][0]
117  _sum = cast(float, first_stat["sum"])
118  last_stats_time = first_stat["start"]
119  else:
120  hourly_data = await home.get_historic_data(
121  FIVE_YEARS, production=is_production
122  )
123  _sum = 0.0
124  last_stats_time = None
125 
126  statistics = []
127 
128  last_stats_time_dt = (
129  dt_util.utc_from_timestamp(last_stats_time)
130  if last_stats_time
131  else None
132  )
133 
134  for data in hourly_data:
135  if data.get(sensor_type) is None:
136  continue
137 
138  from_time = dt_util.parse_datetime(data["from"])
139  if from_time is None or (
140  last_stats_time_dt is not None
141  and from_time <= last_stats_time_dt
142  ):
143  continue
144 
145  _sum += data[sensor_type]
146 
147  statistics.append(
149  start=from_time,
150  state=data[sensor_type],
151  sum=_sum,
152  )
153  )
154 
155  metadata = StatisticMetaData(
156  has_mean=False,
157  has_sum=True,
158  name=f"{home.name} {sensor_type}",
159  source=TIBBER_DOMAIN,
160  statistic_id=statistic_id,
161  unit_of_measurement=unit,
162  )
163  async_add_external_statistics(self.hasshass, metadata, statistics)
None __init__(self, HomeAssistant hass, tibber.Tibber tibber_connection)
Definition: coordinator.py:36
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74