1 """Importer for the Elvia integration."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
6 from typing
import TYPE_CHECKING, cast
8 from elvia
import Elvia, error
as ElviaError
12 async_add_external_statistics,
14 statistics_during_period,
20 from .const
import DOMAIN, LOGGER
23 from elvia.types.meter_value_types
import MeterValueTimeSeries
29 """Class to import data from Elvia."""
35 metering_point_id: str,
39 self.
clientclient = Elvia(meter_value_token=api_token).meter_value()
46 ) -> list[MeterValueTimeSeries]:
47 """Fetch hourly data."""
48 start_time = since.isoformat()
49 end_time = until.isoformat()
50 LOGGER.debug(
"Fetching hourly data %s - %s", start_time, end_time)
51 all_data = await self.
clientclient.get_meter_values(
52 start_time=start_time,
56 return all_data[
"meteringpoints"][0][
"metervalue"][
"timeSeries"]
59 """Import meter values."""
60 statistics: list[StatisticData] = []
61 statistic_id = f
"{DOMAIN}:{self.metering_point_id}_consumption"
73 hourly_data: list[MeterValueTimeSeries] = []
74 until = dt_util.utcnow()
75 for year
in (3, 2, 1):
79 until=until -
timedelta(days=365 * (year - 1)),
81 except ElviaError.ElviaException:
85 hourly_data.extend(year_hours)
87 if hourly_data
is None or len(hourly_data) == 0:
88 LOGGER.error(
"No data available for the metering point")
90 last_stats_time =
None
95 since=dt_util.utc_from_timestamp(
96 last_stats[statistic_id][0][
"end"]
98 until=dt_util.utcnow(),
100 except ElviaError.ElviaException
as err:
101 LOGGER.error(
"Error fetching data: %s", err)
106 or len(hourly_data) == 0
107 or not hourly_data[-1][
"verified"]
108 or (from_time := dt_util.parse_datetime(hourly_data[0][
"startTime"]))
114 statistics_during_period,
123 first_stat = curr_stat[statistic_id][0]
124 _sum = cast(float, first_stat[
"sum"])
125 last_stats_time = first_stat[
"start"]
127 last_stats_time_dt = (
128 dt_util.utc_from_timestamp(last_stats_time)
if last_stats_time
else None
131 for entry
in hourly_data:
132 from_time = dt_util.parse_datetime(entry[
"startTime"])
133 if from_time
is None or (
134 last_stats_time_dt
is not None and from_time <= last_stats_time_dt
138 _sum += entry[
"value"]
141 StatisticData(start=from_time, state=entry[
"value"], sum=_sum)
146 metadata=StatisticMetaData(
149 name=f
"{self.metering_point_id} Consumption",
151 statistic_id=statistic_id,
152 unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
154 statistics=statistics,
156 LOGGER.debug(
"Imported %s statistics", len(statistics))
None __init__(self, HomeAssistant hass, str api_token, str metering_point_id)
list[MeterValueTimeSeries] _fetch_hourly_data(self, datetime since, datetime until)
None import_meter_values(self)
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Recorder get_instance(HomeAssistant hass)