1 """Statistics helper."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from collections.abc
import Callable, Iterable, Sequence
8 from datetime
import datetime, timedelta
9 from functools
import lru_cache, partial
10 from itertools
import chain, groupby
12 from operator
import itemgetter
14 from typing
import TYPE_CHECKING, Any, Literal, TypedDict, cast
16 from sqlalchemy
import Select, and_, bindparam, func, lambda_stmt, select, text
17 from sqlalchemy.engine.row
import Row
18 from sqlalchemy.exc
import SQLAlchemyError
19 from sqlalchemy.orm.session
import Session
20 from sqlalchemy.sql.lambdas
import StatementLambdaElement
21 import voluptuous
as vol
32 BloodGlucoseConcentrationConverter,
33 ConductivityConverter,
37 ElectricCurrentConverter,
38 ElectricPotentialConverter,
46 UnitlessRatioConverter,
48 VolumeFlowRateConverter,
53 EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
54 EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
55 INTEGRATION_PLATFORM_COMPILE_STATISTICS,
56 INTEGRATION_PLATFORM_LIST_STATISTIC_IDS,
57 INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES,
58 INTEGRATION_PLATFORM_VALIDATE_STATISTICS,
61 from .db_schema
import (
70 StatisticDataTimestamp,
73 datetime_to_timestamp_or_none,
78 execute_stmt_lambda_element,
79 filter_unique_constraint_integrity_error,
81 retryable_database_job,
86 from .
import Recorder
89 Statistics.metadata_id,
94 Statistics.last_reset_ts,
99 QUERY_STATISTICS_SHORT_TERM = (
100 StatisticsShortTerm.metadata_id,
101 StatisticsShortTerm.start_ts,
102 StatisticsShortTerm.mean,
103 StatisticsShortTerm.min,
104 StatisticsShortTerm.max,
105 StatisticsShortTerm.last_reset_ts,
106 StatisticsShortTerm.state,
107 StatisticsShortTerm.sum,
110 QUERY_STATISTICS_SUMMARY_MEAN = (
111 StatisticsShortTerm.metadata_id,
112 func.avg(StatisticsShortTerm.mean),
113 func.min(StatisticsShortTerm.min),
114 func.max(StatisticsShortTerm.max),
117 QUERY_STATISTICS_SUMMARY_SUM = (
118 StatisticsShortTerm.metadata_id,
119 StatisticsShortTerm.start_ts,
120 StatisticsShortTerm.last_reset_ts,
121 StatisticsShortTerm.state,
122 StatisticsShortTerm.sum,
125 partition_by=StatisticsShortTerm.metadata_id,
126 order_by=StatisticsShortTerm.start_ts.desc(),
132 STATISTIC_UNIT_TO_UNIT_CONVERTER: dict[str |
None, type[BaseUnitConverter]] = {
133 **{unit: AreaConverter
for unit
in AreaConverter.VALID_UNITS},
135 unit: BloodGlucoseConcentrationConverter
136 for unit
in BloodGlucoseConcentrationConverter.VALID_UNITS
138 **{unit: ConductivityConverter
for unit
in ConductivityConverter.VALID_UNITS},
139 **{unit: DataRateConverter
for unit
in DataRateConverter.VALID_UNITS},
140 **{unit: DistanceConverter
for unit
in DistanceConverter.VALID_UNITS},
141 **{unit: DurationConverter
for unit
in DurationConverter.VALID_UNITS},
142 **{unit: ElectricCurrentConverter
for unit
in ElectricCurrentConverter.VALID_UNITS},
144 unit: ElectricPotentialConverter
145 for unit
in ElectricPotentialConverter.VALID_UNITS
147 **{unit: EnergyConverter
for unit
in EnergyConverter.VALID_UNITS},
148 **{unit: InformationConverter
for unit
in InformationConverter.VALID_UNITS},
149 **{unit: MassConverter
for unit
in MassConverter.VALID_UNITS},
150 **{unit: PowerConverter
for unit
in PowerConverter.VALID_UNITS},
151 **{unit: PressureConverter
for unit
in PressureConverter.VALID_UNITS},
152 **{unit: SpeedConverter
for unit
in SpeedConverter.VALID_UNITS},
153 **{unit: TemperatureConverter
for unit
in TemperatureConverter.VALID_UNITS},
154 **{unit: UnitlessRatioConverter
for unit
in UnitlessRatioConverter.VALID_UNITS},
155 **{unit: VolumeConverter
for unit
in VolumeConverter.VALID_UNITS},
156 **{unit: VolumeFlowRateConverter
for unit
in VolumeFlowRateConverter.VALID_UNITS},
161 unit: converter.UNIT_CLASS
162 for unit, converter
in STATISTIC_UNIT_TO_UNIT_CONVERTER.items()
165 DATA_SHORT_TERM_STATISTICS_RUN_CACHE =
"recorder_short_term_statistics_run_cache"
168 def mean(values: list[float]) -> float |
None:
169 """Return the mean of the values.
171 This is a very simple version that only works
172 with a non-empty list of floats. The built-in
173 statistics.mean is more robust but is almost
174 an order of magnitude slower.
176 return sum(values) / len(values)
179 _LOGGER = logging.getLogger(__name__)
182 @dataclasses.dataclass(slots=True)
184 """Cache for short term statistics runs."""
188 _latest_id_by_metadata_id: dict[int, int] = dataclasses.field(default_factory=dict)
191 """Return the latest short term statistics ids for the metadata_ids."""
194 for metadata_id, id_
in self._latest_id_by_metadata_id.items()
195 if metadata_id
in metadata_ids
199 """Cache the latest id for the metadata_id."""
200 self._latest_id_by_metadata_id[metadata_id] = id_
203 self, metadata_id_to_id: dict[int, int]
205 """Cache the latest id for the each metadata_id."""
206 self._latest_id_by_metadata_id.
update(metadata_id_to_id)
210 """A processed row of statistic data."""
216 """A processed row of statistic data."""
219 last_reset: float |
None
231 statistic_unit: str |
None,
233 """Return the unit which the statistic will be displayed in."""
235 if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit))
is None:
236 return statistic_unit
238 state_unit: str |
None = statistic_unit
239 if state := hass.states.get(statistic_id):
240 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
242 if state_unit == statistic_unit
or state_unit
not in converter.VALID_UNITS:
244 return statistic_unit
250 statistic_unit: str |
None,
251 state_unit: str |
None,
252 requested_units: dict[str, str] |
None,
253 allow_none: bool =
True,
254 ) -> Callable[[float |
None], float |
None] | Callable[[float], float] |
None:
255 """Prepare a converter from the statistics unit to display unit."""
256 if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit))
is None:
259 display_unit: str |
None
260 unit_class = converter.UNIT_CLASS
261 if requested_units
and unit_class
in requested_units:
262 display_unit = requested_units[unit_class]
264 display_unit = state_unit
266 if display_unit
not in converter.VALID_UNITS:
270 if display_unit == statistic_unit:
274 return converter.converter_factory_allow_none(
275 from_unit=statistic_unit, to_unit=display_unit
277 return converter.converter_factory(from_unit=statistic_unit, to_unit=display_unit)
281 display_unit: str |
None,
282 statistic_unit: str |
None,
283 ) -> Callable[[float], float] |
None:
284 """Prepare a converter from the display unit to the statistics unit."""
286 display_unit == statistic_unit
287 or (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit))
is None
290 return converter.converter_factory(from_unit=display_unit, to_unit=statistic_unit)
294 from_unit: str, to_unit: str
295 ) -> Callable[[float |
None], float |
None] |
None:
296 """Prepare a converter from a unit to another unit."""
297 for conv
in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
298 if from_unit
in conv.VALID_UNITS
and to_unit
in conv.VALID_UNITS:
299 if from_unit == to_unit:
301 return conv.converter_factory_allow_none(
302 from_unit=from_unit, to_unit=to_unit
304 raise HomeAssistantError
308 """Return True if it's possible to convert from from_unit to to_unit."""
309 for converter
in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
310 if from_unit
in converter.VALID_UNITS
and to_unit
in converter.VALID_UNITS:
315 @dataclasses.dataclass
317 """Compiled Statistics from a platform."""
319 platform_stats: list[StatisticResult]
320 current_metadata: dict[str, tuple[int, StatisticMetaData]]
324 """Split a state entity ID into domain and object ID."""
325 return entity_id.split(
":", 1)
328 VALID_STATISTIC_ID = re.compile(
r"^(?!.+__)(?!_)[\da-z_]+(?<!_):(?!_)[\da-z_]+(?<!_)$")
332 """Test if a statistic ID is a valid format.
334 Format: <domain>:<statistic> where both are slugs.
336 return VALID_STATISTIC_ID.match(statistic_id)
is not None
340 """Validate statistic ID."""
344 raise vol.Invalid(f
"Statistics ID {value} is an invalid statistic ID")
347 @dataclasses.dataclass
349 """Error or warning message."""
352 data: dict[str, str |
None] |
None =
None
355 """Return dictionary version."""
356 return dataclasses.asdict(self)
360 """Return start time."""
361 now = dt_util.utcnow()
362 current_period_minutes = now.minute - now.minute % 5
363 current_period = now.replace(minute=current_period_minutes, second=0, microsecond=0)
364 return current_period -
timedelta(minutes=5)
368 start_time_ts: float, end_time_ts: float
369 ) -> StatementLambdaElement:
370 """Generate the summary mean statement for hourly statistics."""
372 lambda: select(*QUERY_STATISTICS_SUMMARY_MEAN)
373 .filter(StatisticsShortTerm.start_ts >= start_time_ts)
374 .filter(StatisticsShortTerm.start_ts < end_time_ts)
375 .group_by(StatisticsShortTerm.metadata_id)
376 .order_by(StatisticsShortTerm.metadata_id)
381 start_time_ts: float, end_time_ts: float
382 ) -> StatementLambdaElement:
383 """Generate the summary mean statement for hourly statistics."""
387 select(*QUERY_STATISTICS_SUMMARY_SUM)
388 .filter(StatisticsShortTerm.start_ts >= start_time_ts)
389 .filter(StatisticsShortTerm.start_ts < end_time_ts)
393 .filter(subquery.c.rownum == 1)
394 .order_by(subquery.c.metadata_id)
399 """Compile hourly statistics.
401 This will summarize 5-minute statistics for one hour:
402 - average, min max is computed by a database query
403 - sum is taken from the last 5-minute entry during the hour
405 start_time = start.replace(minute=0)
406 start_time_ts = start_time.timestamp()
407 end_time = start_time + Statistics.duration
408 end_time_ts = end_time.timestamp()
411 summary: dict[int, StatisticDataTimestamp] = {}
417 metadata_id, _mean, _min, _max = stat
418 summary[metadata_id] = {
419 "start_ts": start_time_ts,
431 metadata_id, start, last_reset_ts, state, _sum, _ = stat
432 if metadata_id
in summary:
433 summary[metadata_id].
update(
435 "last_reset_ts": last_reset_ts,
441 summary[metadata_id] = {
442 "start_ts": start_time_ts,
443 "last_reset_ts": last_reset_ts,
450 Statistics.from_stats_ts(metadata_id, summary_item)
451 for metadata_id, summary_item
in summary.items()
455 @retryable_database_job("compile missing statistics")
457 """Compile missing statistics."""
458 now = dt_util.utcnow()
460 last_period_minutes = now.minute - now.minute % period_size
461 last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0)
462 start = now -
timedelta(days=instance.keep_days)
463 start = start.replace(minute=0, second=0, microsecond=0)
465 commit_interval = 60 / period_size * 12
468 session=instance.get_session(),
470 instance,
"statistic"
474 if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
479 periods_without_commit = 0
480 while start < last_period:
481 periods_without_commit += 1
482 end = start +
timedelta(minutes=period_size)
483 _LOGGER.debug(
"Compiling missing statistics for %s-%s", start, end)
485 instance, session, start, end >= last_period
487 if periods_without_commit == commit_interval
or modified_statistic_ids:
489 session.expunge_all()
490 periods_without_commit = 0
496 @retryable_database_job("compile statistics")
498 """Compile 5-minute statistics for all integrations with a recorder platform.
500 The actual calculation is delegated to the platforms.
506 modified_statistic_ids: set[str] |
None =
None
510 session=instance.get_session(),
512 instance,
"statistic"
516 instance, session, start, fire_events
519 if modified_statistic_ids:
524 with session_scope(session=instance.get_session(), read_only=
True)
as session:
525 instance.statistics_meta_manager.get_many(session, modified_statistic_ids)
531 """Return a statement that returns the first run_id at start."""
532 return lambda_stmt(
lambda: select(StatisticsRuns.run_id).filter_by(start=start))
536 instance: Recorder, session: Session, start: datetime, fire_events: bool
538 """Compile 5-minute statistics for all integrations with a recorder platform.
540 This is a helper function for compile_statistics and compile_missing_statistics
541 that does not retry on database errors since both callers already retry.
543 returns a set of modified statistic_ids if any were modified.
545 assert start.tzinfo == dt_util.UTC,
"start must be in UTC"
546 end = start + StatisticsShortTerm.duration
547 statistics_meta_manager = instance.statistics_meta_manager
548 modified_statistic_ids: set[str] = set()
552 _LOGGER.debug(
"Statistics already compiled for %s-%s", start, end)
553 return modified_statistic_ids
555 _LOGGER.debug(
"Compiling statistics for %s-%s", start, end)
556 platform_stats: list[StatisticResult] = []
557 current_metadata: dict[str, tuple[int, StatisticMetaData]] = {}
559 for domain, platform
in instance.hass.data[DOMAIN].recorder_platforms.items():
561 platform_compile_statistics := getattr(
562 platform, INTEGRATION_PLATFORM_COMPILE_STATISTICS,
None
566 compiled: PlatformCompiledStatistics = platform_compile_statistics(
567 instance.hass, session, start, end
570 "Statistics for %s during %s-%s: %s",
574 compiled.platform_stats,
576 platform_stats.extend(compiled.platform_stats)
577 current_metadata.update(compiled.current_metadata)
579 new_short_term_stats: list[StatisticsBase] = []
580 updated_metadata_ids: set[int] = set()
582 for stats
in platform_stats:
583 modified_statistic_id, metadata_id = statistics_meta_manager.update_or_add(
584 session, stats[
"meta"], current_metadata
586 if modified_statistic_id
is not None:
587 modified_statistic_ids.add(modified_statistic_id)
588 updated_metadata_ids.add(metadata_id)
595 new_short_term_stats.append(new_stat)
597 if start.minute == 50:
599 for platform
in instance.hass.data[DOMAIN].recorder_platforms.values():
601 platform_update_issues := getattr(
602 platform, INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES,
None
606 platform_update_issues(instance.hass, session)
608 if start.minute == 55:
615 instance.hass.bus.fire(EVENT_RECORDER_5MIN_STATISTICS_GENERATED)
616 if start.minute == 55:
617 instance.hass.bus.fire(EVENT_RECORDER_HOURLY_STATISTICS_GENERATED)
619 if updated_metadata_ids:
626 run_cache.set_latest_ids_for_metadata_ids(
630 new_stat.metadata_id: new_stat.id
631 for new_stat
in new_short_term_stats
636 return modified_statistic_ids
641 table: type[StatisticsBase],
643 start_time: datetime,
646 """Adjust statistics in the database."""
647 start_time_ts = start_time.timestamp()
649 session.query(table).filter_by(metadata_id=metadata_id).filter(
650 table.start_ts >= start_time_ts
653 table.sum: table.sum + adj,
655 synchronize_session=
False,
657 except SQLAlchemyError:
659 "Unexpected exception when updating statistics %s",
666 table: type[StatisticsBase],
668 statistic: StatisticData,
669 ) -> StatisticsBase |
None:
670 """Insert statistics in the database."""
672 stat = table.from_stats(metadata_id, statistic)
674 except SQLAlchemyError:
676 "Unexpected exception when inserting statistics %s:%s ",
686 table: type[StatisticsBase],
688 statistic: StatisticData,
690 """Insert statistics in the database."""
692 session.query(table).filter_by(id=stat_id).
update(
694 table.mean: statistic.get(
"mean"),
695 table.min: statistic.get(
"min"),
696 table.max: statistic.get(
"max"),
698 statistic.get(
"last_reset")
700 table.state: statistic.get(
"state"),
701 table.sum: statistic.get(
"sum"),
703 synchronize_session=
False,
705 except SQLAlchemyError:
707 "Unexpected exception when updating statistics %s:%s ",
717 statistic_ids: set[str] |
None =
None,
718 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
719 statistic_source: str |
None =
None,
720 ) -> dict[str, tuple[int, StatisticMetaData]]:
723 Returns a dict of (metadata_id, StatisticMetaData) tuples indexed by statistic_id.
724 If statistic_ids is given, fetch metadata only for the listed statistics_ids.
725 If statistic_type is given, fetch metadata only for statistic_ids supporting it.
727 return instance.statistics_meta_manager.get_many(
729 statistic_ids=statistic_ids,
730 statistic_type=statistic_type,
731 statistic_source=statistic_source,
738 statistic_ids: set[str] |
None =
None,
739 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
740 statistic_source: str |
None =
None,
741 ) -> dict[str, tuple[int, StatisticMetaData]]:
742 """Return metadata for statistic_ids."""
747 statistic_ids=statistic_ids,
748 statistic_type=statistic_type,
749 statistic_source=statistic_source,
754 """Clear statistics for a list of statistic_ids."""
755 with session_scope(session=instance.get_session())
as session:
756 instance.statistics_meta_manager.delete(session, statistic_ids)
762 new_statistic_id: str |
None | UndefinedType,
763 new_unit_of_measurement: str |
None | UndefinedType,
765 """Update statistics metadata for a statistic_id."""
766 statistics_meta_manager = instance.statistics_meta_manager
767 if new_unit_of_measurement
is not UNDEFINED:
768 with session_scope(session=instance.get_session())
as session:
769 statistics_meta_manager.update_unit_of_measurement(
770 session, statistic_id, new_unit_of_measurement
772 if new_statistic_id
is not UNDEFINED
and new_statistic_id
is not None:
774 session=instance.get_session(),
776 instance,
"statistic"
779 statistics_meta_manager.update_statistic_id(
780 session, DOMAIN, statistic_id, new_statistic_id
786 statistic_ids: set[str] |
None =
None,
787 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
789 """Return all statistic_ids (or filtered one) and unit of measurement.
791 Queries the database for existing statistic_ids, as well as integrations with
792 a recorder platform for statistic_ids which will be added in the next statistics
797 if statistic_ids
is not None:
800 statistics_meta_manager = instance.statistics_meta_manager
801 metadata = statistics_meta_manager.get_from_cache_threadsafe(statistic_ids)
802 if not statistic_ids.difference(metadata):
806 return await instance.async_add_executor_job(
816 metadata: dict[str, tuple[int, StatisticMetaData]],
817 ) -> dict[str, dict[str, Any]]:
818 """Return a list of results for a given metadata dict."""
820 meta[
"statistic_id"]: {
822 hass, meta[
"statistic_id"], meta[
"unit_of_measurement"]
824 "has_mean": meta[
"has_mean"],
825 "has_sum": meta[
"has_sum"],
826 "name": meta[
"name"],
827 "source": meta[
"source"],
828 "unit_class": UNIT_CLASSES.get(meta[
"unit_of_measurement"]),
829 "unit_of_measurement": meta[
"unit_of_measurement"],
831 for _, meta
in metadata.values()
836 result: dict[str, dict[str, Any]],
838 """Return a flat dict of metadata."""
842 "display_unit_of_measurement": info[
"display_unit_of_measurement"],
843 "has_mean": info[
"has_mean"],
844 "has_sum": info[
"has_sum"],
845 "name": info.get(
"name"),
846 "source": info[
"source"],
847 "statistics_unit_of_measurement": info[
"unit_of_measurement"],
848 "unit_class": info[
"unit_class"],
850 for _id, info
in result.items()
856 statistic_ids: set[str] |
None =
None,
857 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
859 """Return all statistic_ids (or filtered one) and unit of measurement.
861 Queries the database for existing statistic_ids, as well as integrations with
862 a recorder platform for statistic_ids which will be added in the next statistics
867 statistics_meta_manager = instance.statistics_meta_manager
871 metadata = statistics_meta_manager.get_many(
872 session, statistic_type=statistic_type, statistic_ids=statistic_ids
876 if not statistic_ids
or statistic_ids.difference(result):
881 for platform
in hass.data[DOMAIN].recorder_platforms.values():
883 platform_list_statistic_ids := getattr(
884 platform, INTEGRATION_PLATFORM_LIST_STATISTIC_IDS,
None
888 platform_statistic_ids = platform_list_statistic_ids(
889 hass, statistic_ids=statistic_ids, statistic_type=statistic_type
892 for key, meta
in platform_statistic_ids.items():
897 "display_unit_of_measurement": meta[
"unit_of_measurement"],
898 "has_mean": meta[
"has_mean"],
899 "has_sum": meta[
"has_sum"],
900 "name": meta[
"name"],
901 "source": meta[
"source"],
902 "unit_class": UNIT_CLASSES.get(meta[
"unit_of_measurement"]),
903 "unit_of_measurement": meta[
"unit_of_measurement"],
911 stats: dict[str, list[StatisticsRow]],
912 same_period: Callable[[float, float], bool],
913 period_start_end: Callable[[float], tuple[float, float]],
915 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
916 ) -> dict[str, list[StatisticsRow]]:
917 """Reduce hourly statistics to daily or monthly statistics."""
918 result: dict[str, list[StatisticsRow]] = defaultdict(list)
919 period_seconds = period.total_seconds()
920 _want_mean =
"mean" in types
921 _want_min =
"min" in types
922 _want_max =
"max" in types
923 _want_last_reset =
"last_reset" in types
924 _want_state =
"state" in types
925 _want_sum =
"sum" in types
926 for statistic_id, stat_list
in stats.items():
927 max_values: list[float] = []
928 mean_values: list[float] = []
929 min_values: list[float] = []
930 prev_stat: StatisticsRow = stat_list[0]
931 fake_entry: StatisticsRow = {
"start": stat_list[-1][
"start"] + period_seconds}
934 for statistic
in chain(stat_list, (fake_entry,)):
935 if not same_period(prev_stat[
"start"], statistic[
"start"]):
936 start, end = period_start_end(prev_stat[
"start"])
938 row: StatisticsRow = {
943 row[
"mean"] =
mean(mean_values)
if mean_values
else None
946 row[
"min"] =
min(min_values)
if min_values
else None
949 row[
"max"] =
max(max_values)
if max_values
else None
952 row[
"last_reset"] = prev_stat.get(
"last_reset")
954 row[
"state"] = prev_stat.get(
"state")
956 row[
"sum"] = prev_stat[
"sum"]
957 result[statistic_id].append(row)
958 if _want_max
and (_max := statistic.get(
"max"))
is not None:
959 max_values.append(_max)
960 if _want_mean
and (_mean := statistic.get(
"mean"))
is not None:
961 mean_values.append(_mean)
962 if _want_min
and (_min := statistic.get(
"min"))
is not None:
963 min_values.append(_min)
964 prev_stat = statistic
971 Callable[[float, float], bool],
972 Callable[[float], tuple[float, float]],
975 """Return functions to match same day and day start end."""
976 _lower_bound: float = 0
977 _upper_bound: float = 0
980 _local_from_timestamp = partial(
981 datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
984 def _same_day_ts(time1: float, time2: float) -> bool:
985 """Return True if time1 and time2 are in the same date."""
986 nonlocal _lower_bound, _upper_bound
987 if not _lower_bound <= time1 < _upper_bound:
988 _lower_bound, _upper_bound = _day_start_end_ts_cached(time1)
989 return _lower_bound <= time2 < _upper_bound
991 def _day_start_end_ts(time: float) -> tuple[float, float]:
992 """Return the start and end of the period (day) time is within."""
993 start_local = _local_from_timestamp(time).replace(
994 hour=0, minute=0, second=0, microsecond=0
997 start_local.timestamp(),
998 (start_local +
timedelta(days=1)).timestamp(),
1002 _day_start_end_ts_cached = lru_cache(maxsize=6)(_day_start_end_ts)
1004 return _same_day_ts, _day_start_end_ts_cached
1008 stats: dict[str, list[StatisticsRow]],
1009 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1010 ) -> dict[str, list[StatisticsRow]]:
1011 """Reduce hourly statistics to daily statistics."""
1014 stats, _same_day_ts, _day_start_end_ts,
timedelta(days=1), types
1020 Callable[[float, float], bool],
1021 Callable[[float], tuple[float, float]],
1024 """Return functions to match same week and week start end."""
1025 _lower_bound: float = 0
1026 _upper_bound: float = 0
1029 _local_from_timestamp = partial(
1030 datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
1033 def _same_week_ts(time1: float, time2: float) -> bool:
1034 """Return True if time1 and time2 are in the same year and week."""
1035 nonlocal _lower_bound, _upper_bound
1036 if not _lower_bound <= time1 < _upper_bound:
1037 _lower_bound, _upper_bound = _week_start_end_ts_cached(time1)
1038 return _lower_bound <= time2 < _upper_bound
1040 def _week_start_end_ts(time: float) -> tuple[float, float]:
1041 """Return the start and end of the period (week) time is within."""
1042 time_local = _local_from_timestamp(time)
1043 start_local = time_local.replace(
1044 hour=0, minute=0, second=0, microsecond=0
1045 ) -
timedelta(days=time_local.weekday())
1047 start_local.timestamp(),
1048 (start_local +
timedelta(days=7)).timestamp(),
1052 _week_start_end_ts_cached = lru_cache(maxsize=6)(_week_start_end_ts)
1054 return _same_week_ts, _week_start_end_ts_cached
1058 stats: dict[str, list[StatisticsRow]],
1059 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1060 ) -> dict[str, list[StatisticsRow]]:
1061 """Reduce hourly statistics to weekly statistics."""
1064 stats, _same_week_ts, _week_start_end_ts,
timedelta(days=7), types
1069 """Return the end of the month (midnight at the first day of the next month)."""
1071 return (timestamp.replace(day=28) +
timedelta(days=4)).replace(
1072 day=1, hour=0, minute=0, second=0, microsecond=0
1078 Callable[[float, float], bool],
1079 Callable[[float], tuple[float, float]],
1082 """Return functions to match same month and month start end."""
1083 _lower_bound: float = 0
1084 _upper_bound: float = 0
1087 _local_from_timestamp = partial(
1088 datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
1091 def _same_month_ts(time1: float, time2: float) -> bool:
1092 """Return True if time1 and time2 are in the same year and month."""
1093 nonlocal _lower_bound, _upper_bound
1094 if not _lower_bound <= time1 < _upper_bound:
1095 _lower_bound, _upper_bound = _month_start_end_ts_cached(time1)
1096 return _lower_bound <= time2 < _upper_bound
1098 def _month_start_end_ts(time: float) -> tuple[float, float]:
1099 """Return the start and end of the period (month) time is within."""
1100 start_local = _local_from_timestamp(time).replace(
1101 day=1, hour=0, minute=0, second=0, microsecond=0
1104 return (start_local.timestamp(), end_local.timestamp())
1107 _month_start_end_ts_cached = lru_cache(maxsize=6)(_month_start_end_ts)
1109 return _same_month_ts, _month_start_end_ts_cached
1113 stats: dict[str, list[StatisticsRow]],
1114 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1115 ) -> dict[str, list[StatisticsRow]]:
1116 """Reduce hourly statistics to monthly statistics."""
1119 stats, _same_month_ts, _month_start_end_ts,
timedelta(days=31), types
1124 start_time: datetime,
1125 end_time: datetime |
None,
1126 metadata_ids: list[int] |
None,
1127 table: type[StatisticsBase],
1128 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1129 ) -> StatementLambdaElement:
1130 """Prepare a database query for statistics during a given period.
1132 This prepares a lambda_stmt query, so we don't insert the parameters yet.
1134 start_time_ts = start_time.timestamp()
1136 stmt +=
lambda q: q.filter(table.start_ts >= start_time_ts)
1137 if end_time
is not None:
1138 end_time_ts = end_time.timestamp()
1139 stmt +=
lambda q: q.filter(table.start_ts < end_time_ts)
1141 stmt +=
lambda q: q.filter(table.metadata_id.in_(metadata_ids))
1142 stmt +=
lambda q: q.order_by(table.metadata_id, table.start_ts)
1148 start_time: datetime |
None,
1149 end_time: datetime |
None,
1150 table: type[StatisticsBase],
1152 ) -> StatementLambdaElement:
1153 stmt = lambda_stmt(
lambda: columns.filter(table.metadata_id == metadata_id))
1154 if start_time
is not None:
1155 start_time_ts = start_time.timestamp()
1156 stmt +=
lambda q: q.filter(table.start_ts >= start_time_ts)
1157 if end_time
is not None:
1158 end_time_ts = end_time.timestamp()
1159 stmt +=
lambda q: q.filter(table.start_ts < end_time_ts)
1165 result: dict[str, float],
1166 start_time: datetime |
None,
1167 end_time: datetime |
None,
1168 table: type[StatisticsBase],
1169 types: set[Literal[
"max",
"mean",
"min",
"change"]],
1172 """Return max, mean and min during the period."""
1176 columns = columns.add_columns(func.max(table.max))
1178 columns = columns.add_columns(func.avg(table.mean))
1179 columns = columns.add_columns(func.count(table.mean))
1181 columns = columns.add_columns(func.min(table.min))
1183 columns, start_time, end_time, table, metadata_id
1188 if "max" in types
and (new_max := stats[0].max)
is not None:
1189 old_max = result.get(
"max")
1190 result[
"max"] =
max(new_max, old_max)
if old_max
is not None else new_max
1191 if "mean" in types
and stats[0].avg
is not None:
1193 duration = stats[0].count * table.duration.total_seconds()
1194 result[
"duration"] = result.get(
"duration", 0.0) + duration
1195 result[
"mean_acc"] = result.get(
"mean_acc", 0.0) + stats[0].avg * duration
1196 if "min" in types
and (new_min := stats[0].min)
is not None:
1197 old_min = result.get(
"min")
1198 result[
"min"] =
min(new_min, old_min)
if old_min
is not None else new_min
1203 head_start_time: datetime |
None,
1204 head_end_time: datetime |
None,
1205 main_start_time: datetime |
None,
1206 main_end_time: datetime |
None,
1207 tail_start_time: datetime |
None,
1208 tail_end_time: datetime |
None,
1211 types: set[Literal[
"max",
"mean",
"min",
"change"]],
1212 ) -> dict[str, float |
None]:
1213 """Return max, mean and min during the period.
1215 The mean is a time weighted average, combining hourly and 5-minute statistics if
1218 max_mean_min: dict[str, float] = {}
1219 result: dict[str, float |
None] = {}
1221 if tail_start_time
is not None:
1228 StatisticsShortTerm,
1244 if head_start_time
is not None:
1250 StatisticsShortTerm,
1256 result[
"max"] = max_mean_min.get(
"max")
1258 if "mean_acc" not in max_mean_min:
1259 result[
"mean"] =
None
1261 result[
"mean"] = max_mean_min[
"mean_acc"] / max_mean_min[
"duration"]
1263 result[
"min"] = max_mean_min.get(
"min")
1269 table: type[StatisticsBase],
1271 ) -> datetime |
None:
1272 """Return the date of the oldest statistic row for a given metadata id."""
1274 lambda: select(table.start_ts)
1275 .filter(table.metadata_id == metadata_id)
1276 .order_by(table.start_ts.asc())
1280 return dt_util.utc_from_timestamp(stats[0].start_ts)
1286 table: type[StatisticsBase],
1288 ) -> datetime |
None:
1289 """Return the date of the newest statistic row for a given metadata id."""
1291 lambda: select(table.start_ts)
1292 .filter(table.metadata_id == metadata_id)
1293 .order_by(table.start_ts.desc())
1297 return dt_util.utc_from_timestamp(stats[0].start_ts)
1303 head_start_time: datetime |
None,
1304 main_start_time: datetime |
None,
1305 tail_start_time: datetime |
None,
1306 oldest_stat: datetime |
None,
1307 oldest_5_min_stat: datetime |
None,
1311 """Return the oldest non-NULL sum during the period."""
1313 def _get_oldest_sum_statistic_in_sub_period(
1315 start_time: datetime |
None,
1316 table: type[StatisticsBase],
1319 """Return the oldest non-NULL sum during the period."""
1321 lambda: select(table.sum)
1322 .filter(table.metadata_id == metadata_id)
1323 .filter(table.sum.is_not(
None))
1324 .order_by(table.start_ts.asc())
1327 if start_time
is not None:
1328 start_time = start_time + table.duration - timedelta.resolution
1329 if table == StatisticsShortTerm:
1330 minutes = start_time.minute - start_time.minute % 5
1331 period = start_time.replace(minute=minutes, second=0, microsecond=0)
1333 period = start_time.replace(minute=0, second=0, microsecond=0)
1334 prev_period = period - table.duration
1335 prev_period_ts = prev_period.timestamp()
1336 stmt +=
lambda q: q.filter(table.start_ts >= prev_period_ts)
1338 return stats[0].sum
if stats
else None
1340 oldest_sum: float |
None =
None
1344 if not tail_only
and main_start_time
is not None and oldest_stat
is not None:
1345 period = main_start_time.replace(minute=0, second=0, microsecond=0)
1346 prev_period = period - Statistics.duration
1347 if prev_period < oldest_stat:
1351 head_start_time
is not None
1352 and oldest_5_min_stat
is not None
1357 (oldest_stat
is None)
1358 or (oldest_5_min_stat < oldest_stat)
1359 or (oldest_5_min_stat <= head_start_time)
1362 oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1363 session, head_start_time, StatisticsShortTerm, metadata_id
1372 oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1373 session, main_start_time, Statistics, metadata_id
1380 tail_start_time
is not None
1382 oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1383 session, tail_start_time, StatisticsShortTerm, metadata_id
1394 head_start_time: datetime |
None,
1395 head_end_time: datetime |
None,
1396 main_start_time: datetime |
None,
1397 main_end_time: datetime |
None,
1398 tail_start_time: datetime |
None,
1399 tail_end_time: datetime |
None,
1403 """Return the newest non-NULL sum during the period."""
1405 def _get_newest_sum_statistic_in_sub_period(
1407 start_time: datetime |
None,
1408 end_time: datetime |
None,
1409 table: type[StatisticsBase],
1412 """Return the newest non-NULL sum during the period."""
1417 .filter(table.metadata_id == metadata_id)
1418 .filter(table.sum.is_not(
None))
1419 .order_by(table.start_ts.desc())
1422 if start_time
is not None:
1423 start_time_ts = start_time.timestamp()
1424 stmt +=
lambda q: q.filter(table.start_ts >= start_time_ts)
1425 if end_time
is not None:
1426 end_time_ts = end_time.timestamp()
1427 stmt +=
lambda q: q.filter(table.start_ts < end_time_ts)
1430 return stats[0].sum
if stats
else None
1432 newest_sum: float |
None =
None
1434 if tail_start_time
is not None:
1435 newest_sum = _get_newest_sum_statistic_in_sub_period(
1436 session, tail_start_time, tail_end_time, StatisticsShortTerm, metadata_id
1438 if newest_sum
is not None:
1442 newest_sum = _get_newest_sum_statistic_in_sub_period(
1443 session, main_start_time, main_end_time, Statistics, metadata_id
1445 if newest_sum
is not None:
1448 if head_start_time
is not None:
1449 newest_sum = _get_newest_sum_statistic_in_sub_period(
1450 session, head_start_time, head_end_time, StatisticsShortTerm, metadata_id
1457 hass: HomeAssistant,
1458 start_time: datetime |
None,
1459 end_time: datetime |
None,
1461 types: set[Literal[
"max",
"mean",
"min",
"change"]] |
None,
1462 units: dict[str, str] |
None,
1463 ) -> dict[str, Any]:
1464 """Return a statistic data point for the UTC period start_time - end_time."""
1468 types = {
"max",
"mean",
"min",
"change"}
1470 result: dict[str, Any] = {}
1475 metadata :=
get_instance(hass).statistics_meta_manager.get(
1476 session, statistic_id
1481 metadata_id = metadata[0]
1484 oldest_5_min_stat =
None
1487 session, StatisticsShortTerm, metadata_id
1495 now = dt_util.utcnow()
1496 if end_time
is not None and end_time > now:
1500 start_time
is not None
1501 and end_time
is not None
1502 and end_time - start_time < Statistics.duration
1506 head_start_time: datetime |
None =
None
1507 head_end_time: datetime |
None =
None
1510 and oldest_stat
is not None
1511 and oldest_5_min_stat
is not None
1512 and oldest_5_min_stat - oldest_stat < Statistics.duration
1513 and (start_time
is None or start_time < oldest_5_min_stat)
1517 head_start_time = oldest_5_min_stat
1519 oldest_5_min_stat.replace(minute=0, second=0, microsecond=0)
1520 + Statistics.duration
1522 elif not tail_only
and start_time
is not None and start_time.minute:
1523 head_start_time = start_time
1525 start_time.replace(minute=0, second=0, microsecond=0)
1526 + Statistics.duration
1530 tail_start_time: datetime |
None =
None
1531 tail_end_time: datetime |
None =
None
1532 if end_time
is None:
1535 tail_start_time += Statistics.duration
1537 tail_start_time = now.replace(minute=0, second=0, microsecond=0)
1539 tail_start_time = start_time
1540 tail_end_time = end_time
1541 elif end_time.minute:
1542 tail_start_time = end_time.replace(minute=0, second=0, microsecond=0)
1543 tail_end_time = end_time
1546 main_start_time: datetime |
None =
None
1547 main_end_time: datetime |
None =
None
1549 main_start_time = start_time
if head_end_time
is None else head_end_time
1550 main_end_time = end_time
if tail_start_time
is None else tail_start_time
1552 if not types.isdisjoint({
"max",
"mean",
"min"}):
1566 if "change" in types:
1567 oldest_sum: float |
None
1568 if start_time
is None:
1593 if oldest_sum
is not None and newest_sum
is not None:
1594 result[
"change"] = newest_sum - oldest_sum
1596 result[
"change"] =
None
1598 state_unit = unit = metadata[1][
"unit_of_measurement"]
1599 if state := hass.states.get(statistic_id):
1600 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
1605 return {key: convert(value)
for key, value
in result.items()}
1608 _type_column_mapping = {
1609 "last_reset":
"last_reset_ts",
1619 table: type[StatisticsBase],
1620 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1621 ) -> StatementLambdaElement:
1622 columns = select(table.metadata_id, table.start_ts)
1623 track_on: list[str |
None] = [
1624 table.__tablename__,
1626 for key, column
in _type_column_mapping.items():
1628 columns = columns.add_columns(getattr(table, column))
1629 track_on.append(column)
1631 track_on.append(
None)
1632 return lambda_stmt(
lambda: columns, track_on=track_on)
1636 metadata: dict[str, tuple[int, StatisticMetaData]],
1637 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1639 """Extract metadata ids from metadata and discard impossible columns."""
1643 for metadata_id, stats_metadata
in metadata.values():
1644 metadata_ids.append(metadata_id)
1645 has_mean |= stats_metadata[
"has_mean"]
1646 has_sum |= stats_metadata[
"has_sum"]
1648 types.discard(
"mean")
1649 types.discard(
"min")
1650 types.discard(
"max")
1652 types.discard(
"sum")
1653 types.discard(
"state")
1658 hass: HomeAssistant,
1660 start_time: datetime,
1661 units: dict[str, str] |
None,
1662 _types: set[Literal[
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1663 table: type[Statistics | StatisticsShortTerm],
1664 metadata: dict[str, tuple[int, StatisticMetaData]],
1665 result: dict[str, list[StatisticsRow]],
1667 """Add change to the result."""
1668 drop_sum =
"sum" not in _types
1672 {metadata[statistic_id][0]
for statistic_id
in result},
1677 _metadata =
dict(metadata.values())
1679 metadata_by_id = _metadata[row.metadata_id]
1680 statistic_id = metadata_by_id[
"statistic_id"]
1682 state_unit = unit = metadata_by_id[
"unit_of_measurement"]
1683 if state := hass.states.get(statistic_id):
1684 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
1687 if convert
is not None:
1688 prev_sums[statistic_id] = convert(row.sum)
1690 prev_sums[statistic_id] = row.sum
1692 for statistic_id, rows
in result.items():
1693 prev_sum = prev_sums.get(statistic_id)
or 0
1694 for statistics_row
in rows:
1695 if "sum" not in statistics_row:
1698 _sum = statistics_row.pop(
"sum")
1700 _sum = statistics_row[
"sum"]
1702 statistics_row[
"change"] =
None
1704 statistics_row[
"change"] = _sum - prev_sum
1709 hass: HomeAssistant,
1711 start_time: datetime,
1712 end_time: datetime |
None,
1713 statistic_ids: set[str] |
None,
1714 period: Literal[
"5minute",
"day",
"hour",
"week",
"month"],
1715 units: dict[str, str] |
None,
1716 _types: set[Literal[
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1717 ) -> dict[str, list[StatisticsRow]]:
1718 """Return statistic data points during UTC period start_time - end_time.
1720 If end_time is omitted, returns statistics newer than or equal to start_time.
1721 If statistic_ids is omitted, returns statistics for all statistics ids.
1723 if statistic_ids
is not None and not isinstance(statistic_ids, set):
1726 statistic_ids = set(statistic_ids)
1728 metadata =
get_instance(hass).statistics_meta_manager.get_many(
1729 session, statistic_ids=statistic_ids
1734 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]] = set()
1735 for stat_type
in _types:
1736 if stat_type ==
"change":
1739 types.add(stat_type)
1742 if statistic_ids
is not None:
1747 start_time = dt_util.as_local(start_time).replace(
1748 hour=0, minute=0, second=0, microsecond=0
1750 start_time = start_time.replace()
1751 if end_time
is not None:
1752 end_local = dt_util.as_local(end_time)
1753 end_time = end_local.replace(
1754 hour=0, minute=0, second=0, microsecond=0
1756 elif period ==
"week":
1757 start_local = dt_util.as_local(start_time)
1758 start_time = start_local.replace(
1759 hour=0, minute=0, second=0, microsecond=0
1760 ) -
timedelta(days=start_local.weekday())
1761 if end_time
is not None:
1762 end_local = dt_util.as_local(end_time)
1764 end_local.replace(hour=0, minute=0, second=0, microsecond=0)
1768 elif period ==
"month":
1769 start_time = dt_util.as_local(start_time).replace(
1770 day=1, hour=0, minute=0, second=0, microsecond=0
1772 if end_time
is not None:
1775 table: type[Statistics | StatisticsShortTerm] = (
1776 Statistics
if period !=
"5minute" else StatisticsShortTerm
1779 start_time, end_time, metadata_ids, table, types
1802 if period ==
"week":
1805 if period ==
"month":
1808 if "change" in _types:
1810 hass, session, start_time, units, _types, table, metadata, result
1818 hass: HomeAssistant,
1819 start_time: datetime,
1820 end_time: datetime |
None,
1821 statistic_ids: set[str] |
None,
1822 period: Literal[
"5minute",
"day",
"hour",
"week",
"month"],
1823 units: dict[str, str] |
None,
1824 types: set[Literal[
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1825 ) -> dict[str, list[StatisticsRow]]:
1826 """Return statistic data points during UTC period start_time - end_time.
1828 If end_time is omitted, returns statistics newer than or equal to start_time.
1829 If statistic_ids is omitted, returns statistics for all statistics ids.
1846 number_of_stats: int,
1847 ) -> StatementLambdaElement:
1848 """Generate a statement for number_of_stats statistics for a given statistic_id."""
1850 lambda: select(*QUERY_STATISTICS)
1851 .filter_by(metadata_id=metadata_id)
1852 .order_by(Statistics.metadata_id, Statistics.start_ts.desc())
1853 .limit(number_of_stats)
1859 number_of_stats: int,
1860 ) -> StatementLambdaElement:
1861 """Generate a statement for number_of_stats short term statistics.
1863 For a given statistic_id.
1866 lambda: select(*QUERY_STATISTICS_SHORT_TERM)
1867 .filter_by(metadata_id=metadata_id)
1868 .order_by(StatisticsShortTerm.metadata_id, StatisticsShortTerm.start_ts.desc())
1869 .limit(number_of_stats)
1874 hass: HomeAssistant,
1875 number_of_stats: int,
1877 convert_units: bool,
1878 table: type[StatisticsBase],
1879 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1880 ) -> dict[str, list[StatisticsRow]]:
1881 """Return the last number_of_stats statistics for a given statistic_id."""
1882 statistic_ids = {statistic_id}
1885 metadata =
get_instance(hass).statistics_meta_manager.get_many(
1886 session, statistic_ids=statistic_ids
1891 metadata_id = metadata_ids[0]
1892 if table == Statistics:
1917 hass: HomeAssistant,
1918 number_of_stats: int,
1920 convert_units: bool,
1921 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1922 ) -> dict[str, list[StatisticsRow]]:
1923 """Return the last number_of_stats statistics for a statistic_id."""
1925 hass, number_of_stats, statistic_id, convert_units, Statistics, types
1930 hass: HomeAssistant,
1931 number_of_stats: int,
1933 convert_units: bool,
1934 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1935 ) -> dict[str, list[StatisticsRow]]:
1936 """Return the last number_of_stats short term statistics for a statistic_id."""
1938 hass, number_of_stats, statistic_id, convert_units, StatisticsShortTerm, types
1943 session: Session, ids: Iterable[int]
1945 """Return the latest short term statistics for a list of ids."""
1957 ) -> StatementLambdaElement:
1958 """Create the statement for finding the latest short term stat rows by id."""
1960 lambda: select(*QUERY_STATISTICS_SHORT_TERM).filter(
1961 StatisticsShortTerm.id.in_(ids)
1967 hass: HomeAssistant,
1969 statistic_ids: set[str],
1970 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
1971 metadata: dict[str, tuple[int, StatisticMetaData]] |
None =
None,
1972 ) -> dict[str, list[StatisticsRow]]:
1973 """Return the latest short term statistics for a list of statistic_ids with a session."""
1976 metadata =
get_instance(hass).statistics_meta_manager.get_many(
1977 session, statistic_ids=statistic_ids
1989 stats: list[Row] = []
1990 if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
1992 session, metadata_id_to_id.values()
1997 if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id))
and (
1998 found_latest_ids := {
2000 for metadata_id
in missing_metadata_ids
2023 StatisticsShortTerm,
2030 table: type[StatisticsBase],
2031 metadata_ids: set[int],
2032 start_time_ts: float,
2033 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
2034 ) -> StatementLambdaElement:
2035 """Create the statement for finding the statistics for a given time."""
2037 stmt +=
lambda q: q.join(
2039 most_recent_statistic_ids := (
2041 func.max(table.start_ts).label(
"max_start_ts"),
2042 table.metadata_id.label(
"max_metadata_id"),
2044 .filter(table.start_ts < start_time_ts)
2045 .filter(table.metadata_id.in_(metadata_ids))
2046 .group_by(table.metadata_id)
2051 table.start_ts == most_recent_statistic_ids.c.max_start_ts,
2052 table.metadata_id == most_recent_statistic_ids.c.max_metadata_id,
2060 metadata_ids: set[int],
2061 table: type[StatisticsBase],
2062 start_time: datetime,
2063 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
2064 ) -> Sequence[Row] |
None:
2065 """Return last known statistics, earlier than start_time, for the metadata_ids."""
2066 start_time_ts = start_time.timestamp()
2073 table_duration_seconds: float,
2076 convert: Callable[[float |
None], float |
None] | Callable[[float], float],
2077 ) -> list[StatisticsRow]:
2078 """Build a list of sum statistics."""
2081 "start": (start_ts := db_row[start_ts_idx]),
2082 "end": start_ts + table_duration_seconds,
2083 "sum":
None if (v := db_row[sum_idx])
is None else convert(v),
2085 for db_row
in db_rows
2091 table_duration_seconds: float,
2094 ) -> list[StatisticsRow]:
2095 """Build a list of sum statistics."""
2098 "start": (start_ts := db_row[start_ts_idx]),
2099 "end": start_ts + table_duration_seconds,
2100 "sum": db_row[sum_idx],
2102 for db_row
in db_rows
2108 table_duration_seconds: float,
2110 row_mapping: tuple[tuple[str, int], ...],
2111 ) -> list[StatisticsRow]:
2112 """Build a list of statistics without unit conversion."""
2115 "start": (start_ts := db_row[start_ts_idx]),
2116 "end": start_ts + table_duration_seconds,
2117 **{key: db_row[idx]
for key, idx
in row_mapping},
2119 for db_row
in db_rows
2125 table_duration_seconds: float,
2127 row_mapping: tuple[tuple[str, int], ...],
2128 convert: Callable[[float |
None], float |
None] | Callable[[float], float],
2129 ) -> list[StatisticsRow]:
2130 """Build a list of statistics with unit conversion."""
2133 "start": (start_ts := db_row[start_ts_idx]),
2134 "end": start_ts + table_duration_seconds,
2136 key:
None if (v := db_row[idx])
is None else convert(v)
2137 for key, idx
in row_mapping
2140 for db_row
in db_rows
2145 hass: HomeAssistant,
2146 stats: Sequence[Row[Any]],
2147 statistic_ids: set[str] |
None,
2148 _metadata: dict[str, tuple[int, StatisticMetaData]],
2149 convert_units: bool,
2150 table: type[StatisticsBase],
2151 units: dict[str, str] |
None,
2152 types: set[Literal[
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
2153 ) -> dict[str, list[StatisticsRow]]:
2154 """Convert SQL results into JSON friendly data structure."""
2155 assert stats,
"stats must not be empty"
2156 result: dict[str, list[StatisticsRow]] = defaultdict(list)
2157 metadata =
dict(_metadata.values())
2159 field_map: dict[str, int] = {key: idx
for idx, key
in enumerate(stats[0]._fields)}
2160 metadata_id_idx = field_map[
"metadata_id"]
2161 start_ts_idx = field_map[
"start_ts"]
2162 stats_by_meta_id: dict[int, list[Row]] = {}
2163 seen_statistic_ids: set[str] = set()
2164 key_func = itemgetter(metadata_id_idx)
2165 for meta_id, group
in groupby(stats, key_func):
2166 stats_by_meta_id[meta_id] =
list(group)
2167 seen_statistic_ids.add(metadata[meta_id][
"statistic_id"])
2170 if statistic_ids
is not None:
2171 for stat_id
in statistic_ids:
2175 if stat_id
in seen_statistic_ids:
2176 result[stat_id] = []
2181 if "last_reset_ts" in field_map:
2182 field_map[
"last_reset"] = field_map.pop(
"last_reset_ts")
2183 sum_idx = field_map[
"sum"]
if "sum" in types
else None
2184 sum_only = len(types) == 1
and sum_idx
is not None
2185 row_mapping =
tuple((key, field_map[key])
for key
in types
if key
in field_map)
2187 table_duration_seconds = table.duration.total_seconds()
2188 for meta_id, db_rows
in stats_by_meta_id.items():
2189 metadata_by_id = metadata[meta_id]
2190 statistic_id = metadata_by_id[
"statistic_id"]
2192 state_unit = unit = metadata_by_id[
"unit_of_measurement"]
2193 if state := hass.states.get(statistic_id):
2194 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
2196 unit, state_unit, units, allow_none=
False
2201 build_args = (db_rows, table_duration_seconds, start_ts_idx)
2208 assert sum_idx
is not None
2218 result[statistic_id] = _stats
2224 """Validate statistics."""
2225 platform_validation: dict[str, list[ValidationIssue]] = {}
2226 for platform
in hass.data[DOMAIN].recorder_platforms.values():
2227 if platform_validate_statistics := getattr(
2228 platform, INTEGRATION_PLATFORM_VALIDATE_STATISTICS,
None
2230 platform_validation.update(platform_validate_statistics(hass))
2231 return platform_validation
2235 """Update statistics issues."""
2237 for platform
in hass.data[DOMAIN].recorder_platforms.values():
2238 if platform_update_statistics_issues := getattr(
2239 platform, INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES,
None
2241 platform_update_statistics_issues(hass, session)
2246 table: type[StatisticsBase],
2250 """Return id if a statistics entry already exists."""
2251 start_ts = start.timestamp()
2253 session.query(table.id)
2254 .filter((table.metadata_id == metadata_id) & (table.start_ts == start_ts))
2257 return result.id
if result
else None
2262 hass: HomeAssistant,
2263 metadata: StatisticMetaData,
2264 statistics: Iterable[StatisticData],
2266 """Validate timestamps and insert an import_statistics job in the queue."""
2267 for statistic
in statistics:
2268 start = statistic[
"start"]
2269 if start.tzinfo
is None or start.tzinfo.utcoffset(start)
is None:
2271 "Naive timestamp: no or invalid timezone info provided"
2273 if start.minute != 0
or start.second != 0
or start.microsecond != 0:
2275 "Invalid timestamp: timestamps must be from the top of the hour (minutes and seconds = 0)"
2278 statistic[
"start"] = dt_util.as_utc(start)
2280 if "last_reset" in statistic
and statistic[
"last_reset"]
is not None:
2281 last_reset = statistic[
"last_reset"]
2283 last_reset.tzinfo
is None
2284 or last_reset.tzinfo.utcoffset(last_reset)
is None
2287 statistic[
"last_reset"] = dt_util.as_utc(last_reset)
2295 hass: HomeAssistant,
2296 metadata: StatisticMetaData,
2297 statistics: Iterable[StatisticData],
2299 """Import hourly statistics from an internal source.
2301 This inserts an import_statistics job in the recorder's queue.
2307 if not metadata[
"source"]
or metadata[
"source"] != DOMAIN:
2315 hass: HomeAssistant,
2316 metadata: StatisticMetaData,
2317 statistics: Iterable[StatisticData],
2319 """Add hourly statistics from an external source.
2321 This inserts an import_statistics job in the recorder's queue.
2329 if not metadata[
"source"]
or metadata[
"source"] != domain:
2338 metadata: StatisticMetaData,
2339 statistics: Iterable[StatisticData],
2340 table: type[StatisticsBase],
2342 """Import statistics to the database."""
2343 statistics_meta_manager = instance.statistics_meta_manager
2344 old_metadata_dict = statistics_meta_manager.get_many(
2345 session, statistic_ids={metadata[
"statistic_id"]}
2347 _, metadata_id = statistics_meta_manager.update_or_add(
2348 session, metadata, old_metadata_dict
2350 for stat
in statistics:
2356 if table != StatisticsShortTerm:
2363 run_cache, session, metadata_id
2369 @singleton(DATA_SHORT_TERM_STATISTICS_RUN_CACHE)
2371 hass: HomeAssistant,
2372 ) -> ShortTermStatisticsRunCache:
2373 """Get the short term statistics run cache."""
2378 run_cache: ShortTermStatisticsRunCache,
2382 """Cache the latest short term statistic for a given metadata_id.
2384 Returns the id of the latest short term statistic for the metadata_id
2385 that was added to the cache, or None if no latest short term statistic
2386 was found for the metadata_id.
2394 id_: int = latest[0].id
2395 run_cache.set_latest_id_for_metadata_id(metadata_id, id_)
2402 ) -> StatementLambdaElement:
2403 """Create a statement to find the latest short term statistics for a metadata_id."""
2419 StatisticsShortTerm.id,
2421 .where(StatisticsShortTerm.metadata_id == metadata_id)
2422 .order_by(StatisticsShortTerm.start_ts.desc())
2427 @retryable_database_job("statistics")
2430 metadata: StatisticMetaData,
2431 statistics: Iterable[StatisticData],
2432 table: type[StatisticsBase],
2434 """Process an import_statistics job."""
2437 session=instance.get_session(),
2439 instance,
"statistic"
2443 instance, session, metadata, statistics, table
2447 @retryable_database_job("adjust_statistics")
2451 start_time: datetime,
2452 sum_adjustment: float,
2453 adjustment_unit: str,
2455 """Process an add_statistics job."""
2457 with session_scope(session=instance.get_session())
as session:
2458 metadata = instance.statistics_meta_manager.get_many(
2459 session, statistic_ids={statistic_id}
2461 if statistic_id
not in metadata:
2464 statistic_unit = metadata[statistic_id][1][
"unit_of_measurement"]
2466 adjustment_unit, statistic_unit
2468 sum_adjustment = convert(sum_adjustment)
2472 StatisticsShortTerm,
2473 metadata[statistic_id][0],
2481 metadata[statistic_id][0],
2482 start_time.replace(minute=0),
2491 table: type[StatisticsBase],
2493 convert: Callable[[float |
None], float |
None],
2495 """Insert statistics in the database."""
2496 columns = (table.id, table.mean, table.min, table.max, table.state, table.sum)
2497 query = session.query(*columns).filter_by(metadata_id=bindparam(
"metadata_id"))
2498 rows =
execute(query.params(metadata_id=metadata_id))
2500 session.query(table).filter(table.id == row.id).
update(
2502 table.mean: convert(row.mean),
2503 table.min: convert(row.min),
2504 table.max: convert(row.max),
2505 table.state: convert(row.state),
2506 table.sum: convert(row.sum),
2508 synchronize_session=
False,
2518 """Change statistics unit for a statistic_id."""
2519 statistics_meta_manager = instance.statistics_meta_manager
2520 with session_scope(session=instance.get_session())
as session:
2521 metadata = statistics_meta_manager.get(session, statistic_id)
2527 or metadata[1][
"source"] != DOMAIN
2528 or metadata[1][
"unit_of_measurement"] != old_unit
2530 _LOGGER.warning(
"Could not change statistics unit for %s", statistic_id)
2533 metadata_id = metadata[0]
2537 "Statistics unit of measurement for %s is already %s",
2543 tables: tuple[type[StatisticsBase], ...] = (
2545 StatisticsShortTerm,
2547 for table
in tables:
2550 statistics_meta_manager.update_unit_of_measurement(
2551 session, statistic_id, new_unit
2557 hass: HomeAssistant,
2560 new_unit_of_measurement: str,
2561 old_unit_of_measurement: str,
2563 """Change statistics unit for a statistic_id."""
2566 f
"Can't convert {old_unit_of_measurement} to {new_unit_of_measurement}"
2571 new_unit_of_measurement=new_unit_of_measurement,
2572 old_unit_of_measurement=old_unit_of_measurement,
2577 """Clean up the statistics migration from timestamp to datetime.
2579 Returns False if there are more rows to update.
2580 Returns True if all rows have been updated.
2582 engine = instance.engine
2583 assert engine
is not None
2584 if engine.dialect.name == SupportedDialect.SQLITE:
2585 for table
in STATISTICS_TABLES:
2586 with session_scope(session=instance.get_session())
as session:
2589 f
"update {table} set start = NULL, created = NULL, last_reset = NULL;"
2592 elif engine.dialect.name == SupportedDialect.MYSQL:
2593 for table
in STATISTICS_TABLES:
2594 with session_scope(session=instance.get_session())
as session:
2596 session.connection()
2599 f
"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL where start is not NULL LIMIT 100000;"
2607 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2608 for table
in STATISTICS_TABLES:
2609 with session_scope(session=instance.get_session())
as session:
2611 session.connection()
2614 f
"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL "
2615 f
"where id in (select id from {table} where start is not NULL LIMIT 100000)"
2624 from .migration
import _drop_index
2626 for table
in STATISTICS_TABLES:
2627 _drop_index(instance.get_session, table, f
"ix_{table}_start")
None set_latest_id_for_metadata_id(self, int metadata_id, int id_)
None set_latest_ids_for_metadata_ids(self, dict[int, int] metadata_id_to_id)
dict[int, int] get_latest_ids(self, set[int] metadata_ids)
IssData update(pyiss.ISS iss)
def execute(hass, filename, source, data=None, return_response=False)
None _drop_index(Callable[[], Session] session_maker, str table_name, str index_name, bool|None quiet=None)
None process_timestamp(None ts)
float|None datetime_to_timestamp_or_none(datetime|None dt)
None change_statistics_unit(Recorder instance, str statistic_id, str new_unit, str old_unit)
StatementLambdaElement _get_last_statistics_stmt(int metadata_id, int number_of_stats)
None update_statistics_issues(HomeAssistant hass)
StatementLambdaElement _get_last_statistics_short_term_stmt(int metadata_id, int number_of_stats)
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_week_ts_factory()
str validate_statistic_id(str value)
list[StatisticsRow] _build_sum_converted_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, int sum_idx, Callable[[float|None], float|None]|Callable[[float], float] convert)
dict[str, list[StatisticsRow]] statistics_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
StatementLambdaElement _generate_select_columns_for_types_stmt(type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
None _compile_hourly_statistics(Session session, datetime start)
datetime|None _last_statistic(Session session, type[StatisticsBase] table, int metadata_id)
dict[str, tuple[int, StatisticMetaData]] get_metadata(HomeAssistant hass, *set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
dict[str, list[ValidationIssue]] validate_statistics(HomeAssistant hass)
dict[str, list[StatisticsRow]] _reduce_statistics_per_week(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
list[dict] _flatten_list_statistic_ids_metadata_result(dict[str, dict[str, Any]] result)
datetime get_start_time()
str|None get_display_unit(HomeAssistant hass, str statistic_id, str|None statistic_unit)
None clear_statistics(Recorder instance, list[str] statistic_ids)
dict[str, Any] statistic_during_period(HomeAssistant hass, datetime|None start_time, datetime|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str]|None units)
bool _import_statistics_with_session(Recorder instance, Session session, StatisticMetaData metadata, Iterable[StatisticData] statistics, type[StatisticsBase] table)
bool compile_statistics(Recorder instance, datetime start, bool fire_events)
dict[str, list[StatisticsRow]] _reduce_statistics_per_day(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Callable[[float|None], float|None]|Callable[[float], float]|None _get_statistic_to_display_unit_converter(str|None statistic_unit, str|None state_unit, dict[str, str]|None requested_units, bool allow_none=True)
Sequence[Row]|None _statistics_at_time(Session session, set[int] metadata_ids, type[StatisticsBase] table, datetime start_time, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Callable[[float], float]|None _get_display_to_statistic_unit_converter(str|None display_unit, str|None statistic_unit)
StatementLambdaElement _find_latest_short_term_statistic_for_metadata_id_stmt(int metadata_id)
StatementLambdaElement _compile_hourly_statistics_last_sum_stmt(float start_time_ts, float end_time_ts)
None async_change_statistics_unit(HomeAssistant hass, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
StatementLambdaElement _generate_statistics_during_period_stmt(datetime start_time, datetime|None end_time, list[int]|None metadata_ids, type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
dict[str, list[StatisticsRow]] get_last_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
dict[str, list[StatisticsRow]] _reduce_statistics(dict[str, list[StatisticsRow]] stats, Callable[[float, float], bool] same_period, Callable[[float], tuple[float, float]] period_start_end, timedelta period, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
float|None _get_oldest_sum_statistic(Session session, datetime|None head_start_time, datetime|None main_start_time, datetime|None tail_start_time, datetime|None oldest_stat, datetime|None oldest_5_min_stat, bool tail_only, int metadata_id)
StatementLambdaElement _latest_short_term_statistics_by_ids_stmt(Iterable[int] ids)
StatementLambdaElement _generate_max_mean_min_statistic_in_sub_period_stmt(Select columns, datetime|None start_time, datetime|None end_time, type[StatisticsBase] table, int metadata_id)
StatementLambdaElement _get_first_id_stmt(datetime start)
int|None cache_latest_short_term_statistic_id_for_metadata_id(ShortTermStatisticsRunCache run_cache, Session session, int metadata_id)
None update_statistics_metadata(Recorder instance, str statistic_id, str|None|UndefinedType new_statistic_id, str|None|UndefinedType new_unit_of_measurement)
datetime|None _first_statistic(Session session, type[StatisticsBase] table, int metadata_id)
Callable[[float|None], float|None]|None _get_unit_converter(str from_unit, str to_unit)
float|None mean(list[float] values)
StatisticsBase|None _insert_statistics(Session session, type[StatisticsBase] table, int metadata_id, StatisticData statistic)
None _async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
float|None _get_newest_sum_statistic(Session session, datetime|None head_start_time, datetime|None head_end_time, datetime|None main_start_time, datetime|None main_end_time, datetime|None tail_start_time, datetime|None tail_end_time, bool tail_only, int metadata_id)
bool compile_missing_statistics(Recorder instance)
dict[str, list[StatisticsRow]] _sorted_statistics_to_dict(HomeAssistant hass, Sequence[Row[Any]] stats, set[str]|None statistic_ids, dict[str, tuple[int, StatisticMetaData]] _metadata, bool convert_units, type[StatisticsBase] table, dict[str, str]|None units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
list[StatisticsRow] _build_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, tuple[tuple[str, int],...] row_mapping)
bool cleanup_statistics_timestamp_migration(Recorder instance)
None _update_statistics(Session session, type[StatisticsBase] table, int stat_id, StatisticData statistic)
dict[str, list[StatisticsRow]] get_last_short_term_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_day_ts_factory()
list[dict] list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
None _get_max_mean_min_statistic_in_sub_period(Session session, dict[str, float] result, datetime|None start_time, datetime|None end_time, type[StatisticsBase] table, set[Literal["max", "mean", "min", "change"]] types, int metadata_id)
dict[str, tuple[int, StatisticMetaData]] get_metadata_with_session(Recorder instance, Session session, *set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
None _augment_result_with_change(HomeAssistant hass, Session session, datetime start_time, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] _types, type[Statistics|StatisticsShortTerm] table, dict[str, tuple[int, StatisticMetaData]] metadata, dict[str, list[StatisticsRow]] result)
bool valid_statistic_id(str statistic_id)
list[StatisticsRow] _build_sum_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, int sum_idx)
dict[str, list[StatisticsRow]] _statistics_during_period_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] _types)
None async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
list[Row] get_latest_short_term_statistics_by_ids(Session session, Iterable[int] ids)
bool can_convert_units(str|None from_unit, str|None to_unit)
StatementLambdaElement _compile_hourly_statistics_summary_mean_stmt(float start_time_ts, float end_time_ts)
None _adjust_sum_statistics(Session session, type[StatisticsBase] table, int metadata_id, datetime start_time, float adj)
bool adjust_statistics(Recorder instance, str statistic_id, datetime start_time, float sum_adjustment, str adjustment_unit)
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
int|None _statistics_exists(Session session, type[StatisticsBase] table, int metadata_id, datetime start)
ShortTermStatisticsRunCache get_short_term_statistics_run_cache(HomeAssistant hass)
dict[str, list[StatisticsRow]] _get_last_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
StatementLambdaElement _generate_statistics_at_time_stmt(type[StatisticsBase] table, set[int] metadata_ids, float start_time_ts, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
datetime _find_month_end_time(datetime timestamp)
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_month_ts_factory()
dict[str, list[StatisticsRow]] get_latest_short_term_statistics_with_session(HomeAssistant hass, Session session, set[str] statistic_ids, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types, dict[str, tuple[int, StatisticMetaData]]|None metadata=None)
set[str] _compile_statistics(Recorder instance, Session session, datetime start, bool fire_events)
list[dict] async_list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
dict[str, dict[str, Any]] _statistic_by_id_from_metadata(HomeAssistant hass, dict[str, tuple[int, StatisticMetaData]] metadata)
dict[str, list[StatisticsRow]] _reduce_statistics_per_month(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
list[int] _extract_metadata_and_discard_impossible_columns(dict[str, tuple[int, StatisticMetaData]] metadata, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
list[str] split_statistic_id(str entity_id)
dict[str, float|None] _get_max_mean_min_statistic(Session session, datetime|None head_start_time, datetime|None head_end_time, datetime|None main_start_time, datetime|None main_end_time, datetime|None tail_start_time, datetime|None tail_end_time, bool tail_only, int metadata_id, set[Literal["max", "mean", "min", "change"]] types)
list[StatisticsRow] _build_converted_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, tuple[tuple[str, int],...] row_mapping, Callable[[float|None], float|None]|Callable[[float], float] convert)
bool import_statistics(Recorder instance, StatisticMetaData metadata, Iterable[StatisticData] statistics, type[StatisticsBase] table)
None _change_statistics_unit_for_table(Session session, type[StatisticsBase] table, int metadata_id, Callable[[float|None], float|None] convert)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Callable[[Exception], bool] filter_unique_constraint_integrity_error(Recorder instance, str row_type)
bool valid_entity_id(str entity_id)
Recorder get_instance(HomeAssistant hass)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)