1 """Utility functions for Ista EcoTrend integration."""
3 from __future__
import annotations
6 from enum
import StrEnum
13 """Types of consumptions from ista."""
16 HOT_WATER =
"warmwater"
21 """Values type Costs or energy."""
28 data: dict[str, Any], value_type: IstaValueType |
None =
None
29 ) -> list[dict[str, Any]]:
30 """Get consumption readings and sort in ascending order by date."""
32 if consumptions := data.get(
33 "costs" if value_type == IstaValueType.COSTS
else "consumptions", []
37 "readings": readings.get(
"costsByEnergyType")
38 if value_type == IstaValueType.COSTS
39 else readings.get(
"readings"),
42 for readings
in consumptions
44 result.sort(key=
lambda d: d[
"date"])
49 consumptions: dict[str, Any], consumption_type: IstaConsumptionType
51 """Get the readings of a certain type."""
53 readings: list = consumptions.get(
"readings", [])
or consumptions.get(
54 "costsByEnergyType", []
58 (values
for values
in readings
if values.get(
"type") == consumption_type.value),
63 def as_number(value: str | float |
None) -> float | int |
None:
64 """Convert readings to float or int.
66 Readings in the json response are returned as strings,
67 float values have comma as decimal separator
69 if isinstance(value, str):
70 return int(value)
if value.isdigit()
else float(value.replace(
",",
"."))
76 """Get the last day of the month."""
78 return dt_util.as_local(
80 month=month + 1
if month < 12
else 1,
81 year=year
if month < 12
else year + 1,
85 + datetime.timedelta(days=-1)
91 consumption_type: IstaConsumptionType,
92 value_type: IstaValueType |
None =
None,
93 ) -> int | float |
None:
94 """Determine the latest value for the sensor."""
96 if last_value :=
get_statistics(data, consumption_type, value_type):
97 return last_value[-1].
get(
"value")
103 consumption_type: IstaConsumptionType,
104 value_type: IstaValueType |
None =
None,
105 ) -> list[dict[str, Any]] |
None:
106 """Determine the latest value for the sensor."""
113 consumptions=consumptions,
114 consumption_type=consumption_type,
117 if value_type == IstaValueType.ENERGY
121 "date": consumptions[
"date"],
123 for consumptions
in monthly_consumptions
125 consumptions=consumptions,
126 consumption_type=consumption_type,
127 ).
get(
"additionalValue" if value_type == IstaValueType.ENERGY
else "value")
web.Response get(self, web.Request request, str config_key)
list[dict[str, Any]]|None get_statistics(data, IstaConsumptionType consumption_type, IstaValueType|None value_type=None)
float|int|None as_number(str|float|None value)
dict[str, Any] get_values_by_type(dict[str, Any] consumptions, IstaConsumptionType consumption_type)
list[dict[str, Any]] get_consumptions(dict[str, Any] data, IstaValueType|None value_type=None)
int|float|None get_native_value(data, IstaConsumptionType consumption_type, IstaValueType|None value_type=None)
datetime.datetime last_day_of_month(int month, int year)