Home Assistant Unofficial Reference 2024.12.1
importer.py
Go to the documentation of this file.
1 """Importer for the Elvia integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 from typing import TYPE_CHECKING, cast
7 
8 from elvia import Elvia, error as ElviaError
9 
10 from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
12  async_add_external_statistics,
13  get_last_statistics,
14  statistics_during_period,
15 )
16 from homeassistant.components.recorder.util import get_instance
17 from homeassistant.const import UnitOfEnergy
18 from homeassistant.util import dt as dt_util
19 
20 from .const import DOMAIN, LOGGER
21 
22 if TYPE_CHECKING:
23  from elvia.types.meter_value_types import MeterValueTimeSeries
24 
25  from homeassistant.core import HomeAssistant
26 
27 
29  """Class to import data from Elvia."""
30 
31  def __init__(
32  self,
33  hass: HomeAssistant,
34  api_token: str,
35  metering_point_id: str,
36  ) -> None:
37  """Initialize."""
38  self.hasshass = hass
39  self.clientclient = Elvia(meter_value_token=api_token).meter_value()
40  self.metering_point_idmetering_point_id = metering_point_id
41 
42  async def _fetch_hourly_data(
43  self,
44  since: datetime,
45  until: datetime,
46  ) -> list[MeterValueTimeSeries]:
47  """Fetch hourly data."""
48  start_time = since.isoformat()
49  end_time = until.isoformat()
50  LOGGER.debug("Fetching hourly data %s - %s", start_time, end_time)
51  all_data = await self.clientclient.get_meter_values(
52  start_time=start_time,
53  end_time=end_time,
54  metering_point_ids=[self.metering_point_idmetering_point_id],
55  )
56  return all_data["meteringpoints"][0]["metervalue"]["timeSeries"]
57 
58  async def import_meter_values(self) -> None:
59  """Import meter values."""
60  statistics: list[StatisticData] = []
61  statistic_id = f"{DOMAIN}:{self.metering_point_id}_consumption"
62  last_stats = await get_instance(self.hasshass).async_add_executor_job(
63  get_last_statistics,
64  self.hasshass,
65  1,
66  statistic_id,
67  True,
68  {"sum"},
69  )
70 
71  if not last_stats:
72  # First time we insert 3 years of data (if available)
73  hourly_data: list[MeterValueTimeSeries] = []
74  until = dt_util.utcnow()
75  for year in (3, 2, 1):
76  try:
77  year_hours = await self._fetch_hourly_data_fetch_hourly_data(
78  since=until - timedelta(days=365 * year),
79  until=until - timedelta(days=365 * (year - 1)),
80  )
81  except ElviaError.ElviaException:
82  # This will raise if the contract have no data for the
83  # year, we can safely ignore this
84  continue
85  hourly_data.extend(year_hours)
86 
87  if hourly_data is None or len(hourly_data) == 0:
88  LOGGER.error("No data available for the metering point")
89  return
90  last_stats_time = None
91  _sum = 0.0
92  else:
93  try:
94  hourly_data = await self._fetch_hourly_data_fetch_hourly_data(
95  since=dt_util.utc_from_timestamp(
96  last_stats[statistic_id][0]["end"]
97  ),
98  until=dt_util.utcnow(),
99  )
100  except ElviaError.ElviaException as err:
101  LOGGER.error("Error fetching data: %s", err)
102  return
103 
104  if (
105  hourly_data is None
106  or len(hourly_data) == 0
107  or not hourly_data[-1]["verified"]
108  or (from_time := dt_util.parse_datetime(hourly_data[0]["startTime"]))
109  is None
110  ):
111  return
112 
113  curr_stat = await get_instance(self.hasshass).async_add_executor_job(
114  statistics_during_period,
115  self.hasshass,
116  from_time - timedelta(hours=1),
117  None,
118  {statistic_id},
119  "hour",
120  None,
121  {"sum"},
122  )
123  first_stat = curr_stat[statistic_id][0]
124  _sum = cast(float, first_stat["sum"])
125  last_stats_time = first_stat["start"]
126 
127  last_stats_time_dt = (
128  dt_util.utc_from_timestamp(last_stats_time) if last_stats_time else None
129  )
130 
131  for entry in hourly_data:
132  from_time = dt_util.parse_datetime(entry["startTime"])
133  if from_time is None or (
134  last_stats_time_dt is not None and from_time <= last_stats_time_dt
135  ):
136  continue
137 
138  _sum += entry["value"]
139 
140  statistics.append(
141  StatisticData(start=from_time, state=entry["value"], sum=_sum)
142  )
143 
145  hass=self.hasshass,
146  metadata=StatisticMetaData(
147  has_mean=False,
148  has_sum=True,
149  name=f"{self.metering_point_id} Consumption",
150  source=DOMAIN,
151  statistic_id=statistic_id,
152  unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
153  ),
154  statistics=statistics,
155  )
156  LOGGER.debug("Imported %s statistics", len(statistics))
None __init__(self, HomeAssistant hass, str api_token, str metering_point_id)
Definition: importer.py:36
list[MeterValueTimeSeries] _fetch_hourly_data(self, datetime since, datetime until)
Definition: importer.py:46
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74