Home Assistant Unofficial Reference 2024.12.1
statistics.py
Go to the documentation of this file.
1 """Statistics helper."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Callable, Iterable, Sequence
7 import dataclasses
8 from datetime import datetime, timedelta
9 from functools import lru_cache, partial
10 from itertools import chain, groupby
11 import logging
12 from operator import itemgetter
13 import re
14 from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast
15 
16 from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text
17 from sqlalchemy.engine.row import Row
18 from sqlalchemy.exc import SQLAlchemyError
19 from sqlalchemy.orm.session import Session
20 from sqlalchemy.sql.lambdas import StatementLambdaElement
21 import voluptuous as vol
22 
23 from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT
24 from homeassistant.core import HomeAssistant, callback, valid_entity_id
25 from homeassistant.exceptions import HomeAssistantError
26 from homeassistant.helpers.singleton import singleton
27 from homeassistant.helpers.typing import UNDEFINED, UndefinedType
28 from homeassistant.util import dt as dt_util
30  AreaConverter,
31  BaseUnitConverter,
32  BloodGlucoseConcentrationConverter,
33  ConductivityConverter,
34  DataRateConverter,
35  DistanceConverter,
36  DurationConverter,
37  ElectricCurrentConverter,
38  ElectricPotentialConverter,
39  EnergyConverter,
40  InformationConverter,
41  MassConverter,
42  PowerConverter,
43  PressureConverter,
44  SpeedConverter,
45  TemperatureConverter,
46  UnitlessRatioConverter,
47  VolumeConverter,
48  VolumeFlowRateConverter,
49 )
50 
51 from .const import (
52  DOMAIN,
53  EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
54  EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
55  INTEGRATION_PLATFORM_COMPILE_STATISTICS,
56  INTEGRATION_PLATFORM_LIST_STATISTIC_IDS,
57  INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES,
58  INTEGRATION_PLATFORM_VALIDATE_STATISTICS,
59  SupportedDialect,
60 )
61 from .db_schema import (
62  STATISTICS_TABLES,
63  Statistics,
64  StatisticsBase,
65  StatisticsRuns,
66  StatisticsShortTerm,
67 )
68 from .models import (
69  StatisticData,
70  StatisticDataTimestamp,
71  StatisticMetaData,
72  StatisticResult,
73  datetime_to_timestamp_or_none,
74  process_timestamp,
75 )
76 from .util import (
77  execute,
78  execute_stmt_lambda_element,
79  filter_unique_constraint_integrity_error,
80  get_instance,
81  retryable_database_job,
82  session_scope,
83 )
84 
85 if TYPE_CHECKING:
86  from . import Recorder
87 
88 QUERY_STATISTICS = (
89  Statistics.metadata_id,
90  Statistics.start_ts,
91  Statistics.mean,
92  Statistics.min,
93  Statistics.max,
94  Statistics.last_reset_ts,
95  Statistics.state,
96  Statistics.sum,
97 )
98 
99 QUERY_STATISTICS_SHORT_TERM = (
100  StatisticsShortTerm.metadata_id,
101  StatisticsShortTerm.start_ts,
102  StatisticsShortTerm.mean,
103  StatisticsShortTerm.min,
104  StatisticsShortTerm.max,
105  StatisticsShortTerm.last_reset_ts,
106  StatisticsShortTerm.state,
107  StatisticsShortTerm.sum,
108 )
109 
110 QUERY_STATISTICS_SUMMARY_MEAN = (
111  StatisticsShortTerm.metadata_id,
112  func.avg(StatisticsShortTerm.mean),
113  func.min(StatisticsShortTerm.min),
114  func.max(StatisticsShortTerm.max),
115 )
116 
117 QUERY_STATISTICS_SUMMARY_SUM = (
118  StatisticsShortTerm.metadata_id,
119  StatisticsShortTerm.start_ts,
120  StatisticsShortTerm.last_reset_ts,
121  StatisticsShortTerm.state,
122  StatisticsShortTerm.sum,
123  func.row_number()
124  .over(
125  partition_by=StatisticsShortTerm.metadata_id,
126  order_by=StatisticsShortTerm.start_ts.desc(),
127  )
128  .label("rownum"),
129 )
130 
131 
132 STATISTIC_UNIT_TO_UNIT_CONVERTER: dict[str | None, type[BaseUnitConverter]] = {
133  **{unit: AreaConverter for unit in AreaConverter.VALID_UNITS},
134  **{
135  unit: BloodGlucoseConcentrationConverter
136  for unit in BloodGlucoseConcentrationConverter.VALID_UNITS
137  },
138  **{unit: ConductivityConverter for unit in ConductivityConverter.VALID_UNITS},
139  **{unit: DataRateConverter for unit in DataRateConverter.VALID_UNITS},
140  **{unit: DistanceConverter for unit in DistanceConverter.VALID_UNITS},
141  **{unit: DurationConverter for unit in DurationConverter.VALID_UNITS},
142  **{unit: ElectricCurrentConverter for unit in ElectricCurrentConverter.VALID_UNITS},
143  **{
144  unit: ElectricPotentialConverter
145  for unit in ElectricPotentialConverter.VALID_UNITS
146  },
147  **{unit: EnergyConverter for unit in EnergyConverter.VALID_UNITS},
148  **{unit: InformationConverter for unit in InformationConverter.VALID_UNITS},
149  **{unit: MassConverter for unit in MassConverter.VALID_UNITS},
150  **{unit: PowerConverter for unit in PowerConverter.VALID_UNITS},
151  **{unit: PressureConverter for unit in PressureConverter.VALID_UNITS},
152  **{unit: SpeedConverter for unit in SpeedConverter.VALID_UNITS},
153  **{unit: TemperatureConverter for unit in TemperatureConverter.VALID_UNITS},
154  **{unit: UnitlessRatioConverter for unit in UnitlessRatioConverter.VALID_UNITS},
155  **{unit: VolumeConverter for unit in VolumeConverter.VALID_UNITS},
156  **{unit: VolumeFlowRateConverter for unit in VolumeFlowRateConverter.VALID_UNITS},
157 }
158 
159 
160 UNIT_CLASSES = {
161  unit: converter.UNIT_CLASS
162  for unit, converter in STATISTIC_UNIT_TO_UNIT_CONVERTER.items()
163 }
164 
165 DATA_SHORT_TERM_STATISTICS_RUN_CACHE = "recorder_short_term_statistics_run_cache"
166 
167 
168 def mean(values: list[float]) -> float | None:
169  """Return the mean of the values.
170 
171  This is a very simple version that only works
172  with a non-empty list of floats. The built-in
173  statistics.mean is more robust but is almost
174  an order of magnitude slower.
175  """
176  return sum(values) / len(values)
177 
178 
179 _LOGGER = logging.getLogger(__name__)
180 
181 
182 @dataclasses.dataclass(slots=True)
184  """Cache for short term statistics runs."""
185 
186  # This is a mapping of metadata_id:id of the last short term
187  # statistics run for each metadata_id
188  _latest_id_by_metadata_id: dict[int, int] = dataclasses.field(default_factory=dict)
189 
190  def get_latest_ids(self, metadata_ids: set[int]) -> dict[int, int]:
191  """Return the latest short term statistics ids for the metadata_ids."""
192  return {
193  metadata_id: id_
194  for metadata_id, id_ in self._latest_id_by_metadata_id.items()
195  if metadata_id in metadata_ids
196  }
197 
198  def set_latest_id_for_metadata_id(self, metadata_id: int, id_: int) -> None:
199  """Cache the latest id for the metadata_id."""
200  self._latest_id_by_metadata_id[metadata_id] = id_
201 
203  self, metadata_id_to_id: dict[int, int]
204  ) -> None:
205  """Cache the latest id for the each metadata_id."""
206  self._latest_id_by_metadata_id.update(metadata_id_to_id)
207 
208 
209 class BaseStatisticsRow(TypedDict, total=False):
210  """A processed row of statistic data."""
211 
212  start: float
213 
214 
216  """A processed row of statistic data."""
217 
218  end: float
219  last_reset: float | None
220  state: float | None
221  sum: float | None
222  min: float | None
223  max: float | None
224  mean: float | None
225  change: float | None
226 
227 
229  hass: HomeAssistant,
230  statistic_id: str,
231  statistic_unit: str | None,
232 ) -> str | None:
233  """Return the unit which the statistic will be displayed in."""
234 
235  if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None:
236  return statistic_unit
237 
238  state_unit: str | None = statistic_unit
239  if state := hass.states.get(statistic_id):
240  state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
241 
242  if state_unit == statistic_unit or state_unit not in converter.VALID_UNITS:
243  # Guard against invalid state unit in the DB
244  return statistic_unit
245 
246  return state_unit
247 
248 
250  statistic_unit: str | None,
251  state_unit: str | None,
252  requested_units: dict[str, str] | None,
253  allow_none: bool = True,
254 ) -> Callable[[float | None], float | None] | Callable[[float], float] | None:
255  """Prepare a converter from the statistics unit to display unit."""
256  if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None:
257  return None
258 
259  display_unit: str | None
260  unit_class = converter.UNIT_CLASS
261  if requested_units and unit_class in requested_units:
262  display_unit = requested_units[unit_class]
263  else:
264  display_unit = state_unit
265 
266  if display_unit not in converter.VALID_UNITS:
267  # Guard against invalid state unit in the DB
268  return None
269 
270  if display_unit == statistic_unit:
271  return None
272 
273  if allow_none:
274  return converter.converter_factory_allow_none(
275  from_unit=statistic_unit, to_unit=display_unit
276  )
277  return converter.converter_factory(from_unit=statistic_unit, to_unit=display_unit)
278 
279 
281  display_unit: str | None,
282  statistic_unit: str | None,
283 ) -> Callable[[float], float] | None:
284  """Prepare a converter from the display unit to the statistics unit."""
285  if (
286  display_unit == statistic_unit
287  or (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None
288  ):
289  return None
290  return converter.converter_factory(from_unit=display_unit, to_unit=statistic_unit)
291 
292 
294  from_unit: str, to_unit: str
295 ) -> Callable[[float | None], float | None] | None:
296  """Prepare a converter from a unit to another unit."""
297  for conv in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
298  if from_unit in conv.VALID_UNITS and to_unit in conv.VALID_UNITS:
299  if from_unit == to_unit:
300  return None
301  return conv.converter_factory_allow_none(
302  from_unit=from_unit, to_unit=to_unit
303  )
304  raise HomeAssistantError
305 
306 
307 def can_convert_units(from_unit: str | None, to_unit: str | None) -> bool:
308  """Return True if it's possible to convert from from_unit to to_unit."""
309  for converter in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
310  if from_unit in converter.VALID_UNITS and to_unit in converter.VALID_UNITS:
311  return True
312  return False
313 
314 
315 @dataclasses.dataclass
317  """Compiled Statistics from a platform."""
318 
319  platform_stats: list[StatisticResult]
320  current_metadata: dict[str, tuple[int, StatisticMetaData]]
321 
322 
323 def split_statistic_id(entity_id: str) -> list[str]:
324  """Split a state entity ID into domain and object ID."""
325  return entity_id.split(":", 1)
326 
327 
328 VALID_STATISTIC_ID = re.compile(r"^(?!.+__)(?!_)[\da-z_]+(?<!_):(?!_)[\da-z_]+(?<!_)$")
329 
330 
331 def valid_statistic_id(statistic_id: str) -> bool:
332  """Test if a statistic ID is a valid format.
333 
334  Format: <domain>:<statistic> where both are slugs.
335  """
336  return VALID_STATISTIC_ID.match(statistic_id) is not None
337 
338 
339 def validate_statistic_id(value: str) -> str:
340  """Validate statistic ID."""
341  if valid_statistic_id(value):
342  return value
343 
344  raise vol.Invalid(f"Statistics ID {value} is an invalid statistic ID")
345 
346 
347 @dataclasses.dataclass
349  """Error or warning message."""
350 
351  type: str
352  data: dict[str, str | None] | None = None
353 
354  def as_dict(self) -> dict:
355  """Return dictionary version."""
356  return dataclasses.asdict(self)
357 
358 
359 def get_start_time() -> datetime:
360  """Return start time."""
361  now = dt_util.utcnow()
362  current_period_minutes = now.minute - now.minute % 5
363  current_period = now.replace(minute=current_period_minutes, second=0, microsecond=0)
364  return current_period - timedelta(minutes=5)
365 
366 
368  start_time_ts: float, end_time_ts: float
369 ) -> StatementLambdaElement:
370  """Generate the summary mean statement for hourly statistics."""
371  return lambda_stmt(
372  lambda: select(*QUERY_STATISTICS_SUMMARY_MEAN)
373  .filter(StatisticsShortTerm.start_ts >= start_time_ts)
374  .filter(StatisticsShortTerm.start_ts < end_time_ts)
375  .group_by(StatisticsShortTerm.metadata_id)
376  .order_by(StatisticsShortTerm.metadata_id)
377  )
378 
379 
381  start_time_ts: float, end_time_ts: float
382 ) -> StatementLambdaElement:
383  """Generate the summary mean statement for hourly statistics."""
384  return lambda_stmt(
385  lambda: select(
386  subquery := (
387  select(*QUERY_STATISTICS_SUMMARY_SUM)
388  .filter(StatisticsShortTerm.start_ts >= start_time_ts)
389  .filter(StatisticsShortTerm.start_ts < end_time_ts)
390  .subquery()
391  )
392  )
393  .filter(subquery.c.rownum == 1)
394  .order_by(subquery.c.metadata_id)
395  )
396 
397 
398 def _compile_hourly_statistics(session: Session, start: datetime) -> None:
399  """Compile hourly statistics.
400 
401  This will summarize 5-minute statistics for one hour:
402  - average, min max is computed by a database query
403  - sum is taken from the last 5-minute entry during the hour
404  """
405  start_time = start.replace(minute=0)
406  start_time_ts = start_time.timestamp()
407  end_time = start_time + Statistics.duration
408  end_time_ts = end_time.timestamp()
409 
410  # Compute last hour's average, min, max
411  summary: dict[int, StatisticDataTimestamp] = {}
412  stmt = _compile_hourly_statistics_summary_mean_stmt(start_time_ts, end_time_ts)
413  stats = execute_stmt_lambda_element(session, stmt)
414 
415  if stats:
416  for stat in stats:
417  metadata_id, _mean, _min, _max = stat
418  summary[metadata_id] = {
419  "start_ts": start_time_ts,
420  "mean": _mean,
421  "min": _min,
422  "max": _max,
423  }
424 
425  stmt = _compile_hourly_statistics_last_sum_stmt(start_time_ts, end_time_ts)
426  # Get last hour's last sum
427  stats = execute_stmt_lambda_element(session, stmt)
428 
429  if stats:
430  for stat in stats:
431  metadata_id, start, last_reset_ts, state, _sum, _ = stat
432  if metadata_id in summary:
433  summary[metadata_id].update(
434  {
435  "last_reset_ts": last_reset_ts,
436  "state": state,
437  "sum": _sum,
438  }
439  )
440  else:
441  summary[metadata_id] = {
442  "start_ts": start_time_ts,
443  "last_reset_ts": last_reset_ts,
444  "state": state,
445  "sum": _sum,
446  }
447 
448  # Insert compiled hourly statistics in the database
449  session.add_all(
450  Statistics.from_stats_ts(metadata_id, summary_item)
451  for metadata_id, summary_item in summary.items()
452  )
453 
454 
455 @retryable_database_job("compile missing statistics")
456 def compile_missing_statistics(instance: Recorder) -> bool:
457  """Compile missing statistics."""
458  now = dt_util.utcnow()
459  period_size = 5
460  last_period_minutes = now.minute - now.minute % period_size
461  last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0)
462  start = now - timedelta(days=instance.keep_days)
463  start = start.replace(minute=0, second=0, microsecond=0)
464  # Commit every 12 hours of data
465  commit_interval = 60 / period_size * 12
466 
467  with session_scope(
468  session=instance.get_session(),
470  instance, "statistic"
471  ),
472  ) as session:
473  # Find the newest statistics run, if any
474  if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
475  start = max(
476  start, process_timestamp(last_run) + StatisticsShortTerm.duration
477  )
478 
479  periods_without_commit = 0
480  while start < last_period:
481  periods_without_commit += 1
482  end = start + timedelta(minutes=period_size)
483  _LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
484  modified_statistic_ids = _compile_statistics(
485  instance, session, start, end >= last_period
486  )
487  if periods_without_commit == commit_interval or modified_statistic_ids:
488  session.commit()
489  session.expunge_all()
490  periods_without_commit = 0
491  start = end
492 
493  return True
494 
495 
496 @retryable_database_job("compile statistics")
497 def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -> bool:
498  """Compile 5-minute statistics for all integrations with a recorder platform.
499 
500  The actual calculation is delegated to the platforms.
501  """
502  # Define modified_statistic_ids outside of the "with" statement as
503  # _compile_statistics may raise and be trapped by
504  # filter_unique_constraint_integrity_error which would make
505  # modified_statistic_ids unbound.
506  modified_statistic_ids: set[str] | None = None
507 
508  # Return if we already have 5-minute statistics for the requested period
509  with session_scope(
510  session=instance.get_session(),
512  instance, "statistic"
513  ),
514  ) as session:
515  modified_statistic_ids = _compile_statistics(
516  instance, session, start, fire_events
517  )
518 
519  if modified_statistic_ids:
520  # In the rare case that we have modified statistic_ids, we reload the modified
521  # statistics meta data into the cache in a fresh session to ensure that the
522  # cache is up to date and future calls to get statistics meta data will
523  # not have to hit the database again.
524  with session_scope(session=instance.get_session(), read_only=True) as session:
525  instance.statistics_meta_manager.get_many(session, modified_statistic_ids)
526 
527  return True
528 
529 
530 def _get_first_id_stmt(start: datetime) -> StatementLambdaElement:
531  """Return a statement that returns the first run_id at start."""
532  return lambda_stmt(lambda: select(StatisticsRuns.run_id).filter_by(start=start))
533 
534 
536  instance: Recorder, session: Session, start: datetime, fire_events: bool
537 ) -> set[str]:
538  """Compile 5-minute statistics for all integrations with a recorder platform.
539 
540  This is a helper function for compile_statistics and compile_missing_statistics
541  that does not retry on database errors since both callers already retry.
542 
543  returns a set of modified statistic_ids if any were modified.
544  """
545  assert start.tzinfo == dt_util.UTC, "start must be in UTC"
546  end = start + StatisticsShortTerm.duration
547  statistics_meta_manager = instance.statistics_meta_manager
548  modified_statistic_ids: set[str] = set()
549 
550  # Return if we already have 5-minute statistics for the requested period
552  _LOGGER.debug("Statistics already compiled for %s-%s", start, end)
553  return modified_statistic_ids
554 
555  _LOGGER.debug("Compiling statistics for %s-%s", start, end)
556  platform_stats: list[StatisticResult] = []
557  current_metadata: dict[str, tuple[int, StatisticMetaData]] = {}
558  # Collect statistics from all platforms implementing support
559  for domain, platform in instance.hass.data[DOMAIN].recorder_platforms.items():
560  if not (
561  platform_compile_statistics := getattr(
562  platform, INTEGRATION_PLATFORM_COMPILE_STATISTICS, None
563  )
564  ):
565  continue
566  compiled: PlatformCompiledStatistics = platform_compile_statistics(
567  instance.hass, session, start, end
568  )
569  _LOGGER.debug(
570  "Statistics for %s during %s-%s: %s",
571  domain,
572  start,
573  end,
574  compiled.platform_stats,
575  )
576  platform_stats.extend(compiled.platform_stats)
577  current_metadata.update(compiled.current_metadata)
578 
579  new_short_term_stats: list[StatisticsBase] = []
580  updated_metadata_ids: set[int] = set()
581  # Insert collected statistics in the database
582  for stats in platform_stats:
583  modified_statistic_id, metadata_id = statistics_meta_manager.update_or_add(
584  session, stats["meta"], current_metadata
585  )
586  if modified_statistic_id is not None:
587  modified_statistic_ids.add(modified_statistic_id)
588  updated_metadata_ids.add(metadata_id)
589  if new_stat := _insert_statistics(
590  session,
591  StatisticsShortTerm,
592  metadata_id,
593  stats["stat"],
594  ):
595  new_short_term_stats.append(new_stat)
596 
597  if start.minute == 50:
598  # Once every hour, update issues
599  for platform in instance.hass.data[DOMAIN].recorder_platforms.values():
600  if not (
601  platform_update_issues := getattr(
602  platform, INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES, None
603  )
604  ):
605  continue
606  platform_update_issues(instance.hass, session)
607 
608  if start.minute == 55:
609  # A full hour is ready, summarize it
610  _compile_hourly_statistics(session, start)
611 
612  session.add(StatisticsRuns(start=start))
613 
614  if fire_events:
615  instance.hass.bus.fire(EVENT_RECORDER_5MIN_STATISTICS_GENERATED)
616  if start.minute == 55:
617  instance.hass.bus.fire(EVENT_RECORDER_HOURLY_STATISTICS_GENERATED)
618 
619  if updated_metadata_ids:
620  # These are always the newest statistics, so we can update
621  # the run cache without having to check the start_ts.
622  session.flush() # populate the ids of the new StatisticsShortTerm rows
623  run_cache = get_short_term_statistics_run_cache(instance.hass)
624  # metadata_id is typed to allow None, but we know it's not None here
625  # so we can safely cast it to int.
626  run_cache.set_latest_ids_for_metadata_ids(
627  cast(
628  dict[int, int],
629  {
630  new_stat.metadata_id: new_stat.id
631  for new_stat in new_short_term_stats
632  },
633  )
634  )
635 
636  return modified_statistic_ids
637 
638 
640  session: Session,
641  table: type[StatisticsBase],
642  metadata_id: int,
643  start_time: datetime,
644  adj: float,
645 ) -> None:
646  """Adjust statistics in the database."""
647  start_time_ts = start_time.timestamp()
648  try:
649  session.query(table).filter_by(metadata_id=metadata_id).filter(
650  table.start_ts >= start_time_ts
651  ).update(
652  {
653  table.sum: table.sum + adj,
654  },
655  synchronize_session=False,
656  )
657  except SQLAlchemyError:
658  _LOGGER.exception(
659  "Unexpected exception when updating statistics %s",
660  id,
661  )
662 
663 
665  session: Session,
666  table: type[StatisticsBase],
667  metadata_id: int,
668  statistic: StatisticData,
669 ) -> StatisticsBase | None:
670  """Insert statistics in the database."""
671  try:
672  stat = table.from_stats(metadata_id, statistic)
673  session.add(stat)
674  except SQLAlchemyError:
675  _LOGGER.exception(
676  "Unexpected exception when inserting statistics %s:%s ",
677  metadata_id,
678  statistic,
679  )
680  return None
681  return stat
682 
683 
685  session: Session,
686  table: type[StatisticsBase],
687  stat_id: int,
688  statistic: StatisticData,
689 ) -> None:
690  """Insert statistics in the database."""
691  try:
692  session.query(table).filter_by(id=stat_id).update(
693  {
694  table.mean: statistic.get("mean"),
695  table.min: statistic.get("min"),
696  table.max: statistic.get("max"),
697  table.last_reset_ts: datetime_to_timestamp_or_none(
698  statistic.get("last_reset")
699  ),
700  table.state: statistic.get("state"),
701  table.sum: statistic.get("sum"),
702  },
703  synchronize_session=False,
704  )
705  except SQLAlchemyError:
706  _LOGGER.exception(
707  "Unexpected exception when updating statistics %s:%s ",
708  stat_id,
709  statistic,
710  )
711 
712 
714  instance: Recorder,
715  session: Session,
716  *,
717  statistic_ids: set[str] | None = None,
718  statistic_type: Literal["mean", "sum"] | None = None,
719  statistic_source: str | None = None,
720 ) -> dict[str, tuple[int, StatisticMetaData]]:
721  """Fetch meta data.
722 
723  Returns a dict of (metadata_id, StatisticMetaData) tuples indexed by statistic_id.
724  If statistic_ids is given, fetch metadata only for the listed statistics_ids.
725  If statistic_type is given, fetch metadata only for statistic_ids supporting it.
726  """
727  return instance.statistics_meta_manager.get_many(
728  session,
729  statistic_ids=statistic_ids,
730  statistic_type=statistic_type,
731  statistic_source=statistic_source,
732  )
733 
734 
736  hass: HomeAssistant,
737  *,
738  statistic_ids: set[str] | None = None,
739  statistic_type: Literal["mean", "sum"] | None = None,
740  statistic_source: str | None = None,
741 ) -> dict[str, tuple[int, StatisticMetaData]]:
742  """Return metadata for statistic_ids."""
743  with session_scope(hass=hass, read_only=True) as session:
745  get_instance(hass),
746  session,
747  statistic_ids=statistic_ids,
748  statistic_type=statistic_type,
749  statistic_source=statistic_source,
750  )
751 
752 
753 def clear_statistics(instance: Recorder, statistic_ids: list[str]) -> None:
754  """Clear statistics for a list of statistic_ids."""
755  with session_scope(session=instance.get_session()) as session:
756  instance.statistics_meta_manager.delete(session, statistic_ids)
757 
758 
760  instance: Recorder,
761  statistic_id: str,
762  new_statistic_id: str | None | UndefinedType,
763  new_unit_of_measurement: str | None | UndefinedType,
764 ) -> None:
765  """Update statistics metadata for a statistic_id."""
766  statistics_meta_manager = instance.statistics_meta_manager
767  if new_unit_of_measurement is not UNDEFINED:
768  with session_scope(session=instance.get_session()) as session:
769  statistics_meta_manager.update_unit_of_measurement(
770  session, statistic_id, new_unit_of_measurement
771  )
772  if new_statistic_id is not UNDEFINED and new_statistic_id is not None:
773  with session_scope(
774  session=instance.get_session(),
776  instance, "statistic"
777  ),
778  ) as session:
779  statistics_meta_manager.update_statistic_id(
780  session, DOMAIN, statistic_id, new_statistic_id
781  )
782 
783 
785  hass: HomeAssistant,
786  statistic_ids: set[str] | None = None,
787  statistic_type: Literal["mean", "sum"] | None = None,
788 ) -> list[dict]:
789  """Return all statistic_ids (or filtered one) and unit of measurement.
790 
791  Queries the database for existing statistic_ids, as well as integrations with
792  a recorder platform for statistic_ids which will be added in the next statistics
793  period.
794  """
795  instance = get_instance(hass)
796 
797  if statistic_ids is not None:
798  # Try to get the results from the cache since there is nearly
799  # always a cache hit.
800  statistics_meta_manager = instance.statistics_meta_manager
801  metadata = statistics_meta_manager.get_from_cache_threadsafe(statistic_ids)
802  if not statistic_ids.difference(metadata):
803  result = _statistic_by_id_from_metadata(hass, metadata)
805 
806  return await instance.async_add_executor_job(
807  list_statistic_ids,
808  hass,
809  statistic_ids,
810  statistic_type,
811  )
812 
813 
815  hass: HomeAssistant,
816  metadata: dict[str, tuple[int, StatisticMetaData]],
817 ) -> dict[str, dict[str, Any]]:
818  """Return a list of results for a given metadata dict."""
819  return {
820  meta["statistic_id"]: {
821  "display_unit_of_measurement": get_display_unit(
822  hass, meta["statistic_id"], meta["unit_of_measurement"]
823  ),
824  "has_mean": meta["has_mean"],
825  "has_sum": meta["has_sum"],
826  "name": meta["name"],
827  "source": meta["source"],
828  "unit_class": UNIT_CLASSES.get(meta["unit_of_measurement"]),
829  "unit_of_measurement": meta["unit_of_measurement"],
830  }
831  for _, meta in metadata.values()
832  }
833 
834 
836  result: dict[str, dict[str, Any]],
837 ) -> list[dict]:
838  """Return a flat dict of metadata."""
839  return [
840  {
841  "statistic_id": _id,
842  "display_unit_of_measurement": info["display_unit_of_measurement"],
843  "has_mean": info["has_mean"],
844  "has_sum": info["has_sum"],
845  "name": info.get("name"),
846  "source": info["source"],
847  "statistics_unit_of_measurement": info["unit_of_measurement"],
848  "unit_class": info["unit_class"],
849  }
850  for _id, info in result.items()
851  ]
852 
853 
855  hass: HomeAssistant,
856  statistic_ids: set[str] | None = None,
857  statistic_type: Literal["mean", "sum"] | None = None,
858 ) -> list[dict]:
859  """Return all statistic_ids (or filtered one) and unit of measurement.
860 
861  Queries the database for existing statistic_ids, as well as integrations with
862  a recorder platform for statistic_ids which will be added in the next statistics
863  period.
864  """
865  result = {}
866  instance = get_instance(hass)
867  statistics_meta_manager = instance.statistics_meta_manager
868 
869  # Query the database
870  with session_scope(hass=hass, read_only=True) as session:
871  metadata = statistics_meta_manager.get_many(
872  session, statistic_type=statistic_type, statistic_ids=statistic_ids
873  )
874  result = _statistic_by_id_from_metadata(hass, metadata)
875 
876  if not statistic_ids or statistic_ids.difference(result):
877  # If we want all statistic_ids, or some are missing, we need to query
878  # the integrations for the missing ones.
879  #
880  # Query all integrations with a registered recorder platform
881  for platform in hass.data[DOMAIN].recorder_platforms.values():
882  if not (
883  platform_list_statistic_ids := getattr(
884  platform, INTEGRATION_PLATFORM_LIST_STATISTIC_IDS, None
885  )
886  ):
887  continue
888  platform_statistic_ids = platform_list_statistic_ids(
889  hass, statistic_ids=statistic_ids, statistic_type=statistic_type
890  )
891 
892  for key, meta in platform_statistic_ids.items():
893  if key in result:
894  # The database has a higher priority than the integration
895  continue
896  result[key] = {
897  "display_unit_of_measurement": meta["unit_of_measurement"],
898  "has_mean": meta["has_mean"],
899  "has_sum": meta["has_sum"],
900  "name": meta["name"],
901  "source": meta["source"],
902  "unit_class": UNIT_CLASSES.get(meta["unit_of_measurement"]),
903  "unit_of_measurement": meta["unit_of_measurement"],
904  }
905 
906  # Return a list of statistic_id + metadata
908 
909 
911  stats: dict[str, list[StatisticsRow]],
912  same_period: Callable[[float, float], bool],
913  period_start_end: Callable[[float], tuple[float, float]],
914  period: timedelta,
915  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
916 ) -> dict[str, list[StatisticsRow]]:
917  """Reduce hourly statistics to daily or monthly statistics."""
918  result: dict[str, list[StatisticsRow]] = defaultdict(list)
919  period_seconds = period.total_seconds()
920  _want_mean = "mean" in types
921  _want_min = "min" in types
922  _want_max = "max" in types
923  _want_last_reset = "last_reset" in types
924  _want_state = "state" in types
925  _want_sum = "sum" in types
926  for statistic_id, stat_list in stats.items():
927  max_values: list[float] = []
928  mean_values: list[float] = []
929  min_values: list[float] = []
930  prev_stat: StatisticsRow = stat_list[0]
931  fake_entry: StatisticsRow = {"start": stat_list[-1]["start"] + period_seconds}
932 
933  # Loop over the hourly statistics + a fake entry to end the period
934  for statistic in chain(stat_list, (fake_entry,)):
935  if not same_period(prev_stat["start"], statistic["start"]):
936  start, end = period_start_end(prev_stat["start"])
937  # The previous statistic was the last entry of the period
938  row: StatisticsRow = {
939  "start": start,
940  "end": end,
941  }
942  if _want_mean:
943  row["mean"] = mean(mean_values) if mean_values else None
944  mean_values.clear()
945  if _want_min:
946  row["min"] = min(min_values) if min_values else None
947  min_values.clear()
948  if _want_max:
949  row["max"] = max(max_values) if max_values else None
950  max_values.clear()
951  if _want_last_reset:
952  row["last_reset"] = prev_stat.get("last_reset")
953  if _want_state:
954  row["state"] = prev_stat.get("state")
955  if _want_sum:
956  row["sum"] = prev_stat["sum"]
957  result[statistic_id].append(row)
958  if _want_max and (_max := statistic.get("max")) is not None:
959  max_values.append(_max)
960  if _want_mean and (_mean := statistic.get("mean")) is not None:
961  mean_values.append(_mean)
962  if _want_min and (_min := statistic.get("min")) is not None:
963  min_values.append(_min)
964  prev_stat = statistic
965 
966  return result
967 
968 
970  tuple[
971  Callable[[float, float], bool],
972  Callable[[float], tuple[float, float]],
973  ]
974 ):
975  """Return functions to match same day and day start end."""
976  _lower_bound: float = 0
977  _upper_bound: float = 0
978 
979  # We have to recreate _local_from_timestamp in the closure in case the timezone changes
980  _local_from_timestamp = partial(
981  datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
982  )
983 
984  def _same_day_ts(time1: float, time2: float) -> bool:
985  """Return True if time1 and time2 are in the same date."""
986  nonlocal _lower_bound, _upper_bound
987  if not _lower_bound <= time1 < _upper_bound:
988  _lower_bound, _upper_bound = _day_start_end_ts_cached(time1)
989  return _lower_bound <= time2 < _upper_bound
990 
991  def _day_start_end_ts(time: float) -> tuple[float, float]:
992  """Return the start and end of the period (day) time is within."""
993  start_local = _local_from_timestamp(time).replace(
994  hour=0, minute=0, second=0, microsecond=0
995  )
996  return (
997  start_local.timestamp(),
998  (start_local + timedelta(days=1)).timestamp(),
999  )
1000 
1001  # We create _day_start_end_ts_cached in the closure in case the timezone changes
1002  _day_start_end_ts_cached = lru_cache(maxsize=6)(_day_start_end_ts)
1003 
1004  return _same_day_ts, _day_start_end_ts_cached
1005 
1006 
1008  stats: dict[str, list[StatisticsRow]],
1009  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1010 ) -> dict[str, list[StatisticsRow]]:
1011  """Reduce hourly statistics to daily statistics."""
1012  _same_day_ts, _day_start_end_ts = reduce_day_ts_factory()
1013  return _reduce_statistics(
1014  stats, _same_day_ts, _day_start_end_ts, timedelta(days=1), types
1015  )
1016 
1017 
1019  tuple[
1020  Callable[[float, float], bool],
1021  Callable[[float], tuple[float, float]],
1022  ]
1023 ):
1024  """Return functions to match same week and week start end."""
1025  _lower_bound: float = 0
1026  _upper_bound: float = 0
1027 
1028  # We have to recreate _local_from_timestamp in the closure in case the timezone changes
1029  _local_from_timestamp = partial(
1030  datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
1031  )
1032 
1033  def _same_week_ts(time1: float, time2: float) -> bool:
1034  """Return True if time1 and time2 are in the same year and week."""
1035  nonlocal _lower_bound, _upper_bound
1036  if not _lower_bound <= time1 < _upper_bound:
1037  _lower_bound, _upper_bound = _week_start_end_ts_cached(time1)
1038  return _lower_bound <= time2 < _upper_bound
1039 
1040  def _week_start_end_ts(time: float) -> tuple[float, float]:
1041  """Return the start and end of the period (week) time is within."""
1042  time_local = _local_from_timestamp(time)
1043  start_local = time_local.replace(
1044  hour=0, minute=0, second=0, microsecond=0
1045  ) - timedelta(days=time_local.weekday())
1046  return (
1047  start_local.timestamp(),
1048  (start_local + timedelta(days=7)).timestamp(),
1049  )
1050 
1051  # We create _week_start_end_ts_cached in the closure in case the timezone changes
1052  _week_start_end_ts_cached = lru_cache(maxsize=6)(_week_start_end_ts)
1053 
1054  return _same_week_ts, _week_start_end_ts_cached
1055 
1056 
1058  stats: dict[str, list[StatisticsRow]],
1059  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1060 ) -> dict[str, list[StatisticsRow]]:
1061  """Reduce hourly statistics to weekly statistics."""
1062  _same_week_ts, _week_start_end_ts = reduce_week_ts_factory()
1063  return _reduce_statistics(
1064  stats, _same_week_ts, _week_start_end_ts, timedelta(days=7), types
1065  )
1066 
1067 
1068 def _find_month_end_time(timestamp: datetime) -> datetime:
1069  """Return the end of the month (midnight at the first day of the next month)."""
1070  # We add 4 days to the end to make sure we are in the next month
1071  return (timestamp.replace(day=28) + timedelta(days=4)).replace(
1072  day=1, hour=0, minute=0, second=0, microsecond=0
1073  )
1074 
1075 
1077  tuple[
1078  Callable[[float, float], bool],
1079  Callable[[float], tuple[float, float]],
1080  ]
1081 ):
1082  """Return functions to match same month and month start end."""
1083  _lower_bound: float = 0
1084  _upper_bound: float = 0
1085 
1086  # We have to recreate _local_from_timestamp in the closure in case the timezone changes
1087  _local_from_timestamp = partial(
1088  datetime.fromtimestamp, tz=dt_util.get_default_time_zone()
1089  )
1090 
1091  def _same_month_ts(time1: float, time2: float) -> bool:
1092  """Return True if time1 and time2 are in the same year and month."""
1093  nonlocal _lower_bound, _upper_bound
1094  if not _lower_bound <= time1 < _upper_bound:
1095  _lower_bound, _upper_bound = _month_start_end_ts_cached(time1)
1096  return _lower_bound <= time2 < _upper_bound
1097 
1098  def _month_start_end_ts(time: float) -> tuple[float, float]:
1099  """Return the start and end of the period (month) time is within."""
1100  start_local = _local_from_timestamp(time).replace(
1101  day=1, hour=0, minute=0, second=0, microsecond=0
1102  )
1103  end_local = _find_month_end_time(start_local)
1104  return (start_local.timestamp(), end_local.timestamp())
1105 
1106  # We create _month_start_end_ts_cached in the closure in case the timezone changes
1107  _month_start_end_ts_cached = lru_cache(maxsize=6)(_month_start_end_ts)
1108 
1109  return _same_month_ts, _month_start_end_ts_cached
1110 
1111 
1113  stats: dict[str, list[StatisticsRow]],
1114  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1115 ) -> dict[str, list[StatisticsRow]]:
1116  """Reduce hourly statistics to monthly statistics."""
1117  _same_month_ts, _month_start_end_ts = reduce_month_ts_factory()
1118  return _reduce_statistics(
1119  stats, _same_month_ts, _month_start_end_ts, timedelta(days=31), types
1120  )
1121 
1122 
1124  start_time: datetime,
1125  end_time: datetime | None,
1126  metadata_ids: list[int] | None,
1127  table: type[StatisticsBase],
1128  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1129 ) -> StatementLambdaElement:
1130  """Prepare a database query for statistics during a given period.
1131 
1132  This prepares a lambda_stmt query, so we don't insert the parameters yet.
1133  """
1134  start_time_ts = start_time.timestamp()
1135  stmt = _generate_select_columns_for_types_stmt(table, types)
1136  stmt += lambda q: q.filter(table.start_ts >= start_time_ts)
1137  if end_time is not None:
1138  end_time_ts = end_time.timestamp()
1139  stmt += lambda q: q.filter(table.start_ts < end_time_ts)
1140  if metadata_ids:
1141  stmt += lambda q: q.filter(table.metadata_id.in_(metadata_ids))
1142  stmt += lambda q: q.order_by(table.metadata_id, table.start_ts)
1143  return stmt
1144 
1145 
1147  columns: Select,
1148  start_time: datetime | None,
1149  end_time: datetime | None,
1150  table: type[StatisticsBase],
1151  metadata_id: int,
1152 ) -> StatementLambdaElement:
1153  stmt = lambda_stmt(lambda: columns.filter(table.metadata_id == metadata_id))
1154  if start_time is not None:
1155  start_time_ts = start_time.timestamp()
1156  stmt += lambda q: q.filter(table.start_ts >= start_time_ts)
1157  if end_time is not None:
1158  end_time_ts = end_time.timestamp()
1159  stmt += lambda q: q.filter(table.start_ts < end_time_ts)
1160  return stmt
1161 
1162 
1164  session: Session,
1165  result: dict[str, float],
1166  start_time: datetime | None,
1167  end_time: datetime | None,
1168  table: type[StatisticsBase],
1169  types: set[Literal["max", "mean", "min", "change"]],
1170  metadata_id: int,
1171 ) -> None:
1172  """Return max, mean and min during the period."""
1173  # Calculate max, mean, min
1174  columns = select()
1175  if "max" in types:
1176  columns = columns.add_columns(func.max(table.max))
1177  if "mean" in types:
1178  columns = columns.add_columns(func.avg(table.mean))
1179  columns = columns.add_columns(func.count(table.mean))
1180  if "min" in types:
1181  columns = columns.add_columns(func.min(table.min))
1183  columns, start_time, end_time, table, metadata_id
1184  )
1185  stats = cast(Sequence[Row[Any]], execute_stmt_lambda_element(session, stmt))
1186  if not stats:
1187  return
1188  if "max" in types and (new_max := stats[0].max) is not None:
1189  old_max = result.get("max")
1190  result["max"] = max(new_max, old_max) if old_max is not None else new_max
1191  if "mean" in types and stats[0].avg is not None:
1192  # https://github.com/sqlalchemy/sqlalchemy/issues/9127
1193  duration = stats[0].count * table.duration.total_seconds() # type: ignore[operator]
1194  result["duration"] = result.get("duration", 0.0) + duration
1195  result["mean_acc"] = result.get("mean_acc", 0.0) + stats[0].avg * duration
1196  if "min" in types and (new_min := stats[0].min) is not None:
1197  old_min = result.get("min")
1198  result["min"] = min(new_min, old_min) if old_min is not None else new_min
1199 
1200 
1202  session: Session,
1203  head_start_time: datetime | None,
1204  head_end_time: datetime | None,
1205  main_start_time: datetime | None,
1206  main_end_time: datetime | None,
1207  tail_start_time: datetime | None,
1208  tail_end_time: datetime | None,
1209  tail_only: bool,
1210  metadata_id: int,
1211  types: set[Literal["max", "mean", "min", "change"]],
1212 ) -> dict[str, float | None]:
1213  """Return max, mean and min during the period.
1214 
1215  The mean is a time weighted average, combining hourly and 5-minute statistics if
1216  necessary.
1217  """
1218  max_mean_min: dict[str, float] = {}
1219  result: dict[str, float | None] = {}
1220 
1221  if tail_start_time is not None:
1222  # Calculate max, mean, min
1224  session,
1225  max_mean_min,
1226  tail_start_time,
1227  tail_end_time,
1228  StatisticsShortTerm,
1229  types,
1230  metadata_id,
1231  )
1232 
1233  if not tail_only:
1235  session,
1236  max_mean_min,
1237  main_start_time,
1238  main_end_time,
1239  Statistics,
1240  types,
1241  metadata_id,
1242  )
1243 
1244  if head_start_time is not None:
1246  session,
1247  max_mean_min,
1248  head_start_time,
1249  head_end_time,
1250  StatisticsShortTerm,
1251  types,
1252  metadata_id,
1253  )
1254 
1255  if "max" in types:
1256  result["max"] = max_mean_min.get("max")
1257  if "mean" in types:
1258  if "mean_acc" not in max_mean_min:
1259  result["mean"] = None
1260  else:
1261  result["mean"] = max_mean_min["mean_acc"] / max_mean_min["duration"]
1262  if "min" in types:
1263  result["min"] = max_mean_min.get("min")
1264  return result
1265 
1266 
1268  session: Session,
1269  table: type[StatisticsBase],
1270  metadata_id: int,
1271 ) -> datetime | None:
1272  """Return the date of the oldest statistic row for a given metadata id."""
1273  stmt = lambda_stmt(
1274  lambda: select(table.start_ts)
1275  .filter(table.metadata_id == metadata_id)
1276  .order_by(table.start_ts.asc())
1277  .limit(1)
1278  )
1279  if stats := cast(Sequence[Row], execute_stmt_lambda_element(session, stmt)):
1280  return dt_util.utc_from_timestamp(stats[0].start_ts)
1281  return None
1282 
1283 
1285  session: Session,
1286  table: type[StatisticsBase],
1287  metadata_id: int,
1288 ) -> datetime | None:
1289  """Return the date of the newest statistic row for a given metadata id."""
1290  stmt = lambda_stmt(
1291  lambda: select(table.start_ts)
1292  .filter(table.metadata_id == metadata_id)
1293  .order_by(table.start_ts.desc())
1294  .limit(1)
1295  )
1296  if stats := cast(Sequence[Row], execute_stmt_lambda_element(session, stmt)):
1297  return dt_util.utc_from_timestamp(stats[0].start_ts)
1298  return None
1299 
1300 
1302  session: Session,
1303  head_start_time: datetime | None,
1304  main_start_time: datetime | None,
1305  tail_start_time: datetime | None,
1306  oldest_stat: datetime | None,
1307  oldest_5_min_stat: datetime | None,
1308  tail_only: bool,
1309  metadata_id: int,
1310 ) -> float | None:
1311  """Return the oldest non-NULL sum during the period."""
1312 
1313  def _get_oldest_sum_statistic_in_sub_period(
1314  session: Session,
1315  start_time: datetime | None,
1316  table: type[StatisticsBase],
1317  metadata_id: int,
1318  ) -> float | None:
1319  """Return the oldest non-NULL sum during the period."""
1320  stmt = lambda_stmt(
1321  lambda: select(table.sum)
1322  .filter(table.metadata_id == metadata_id)
1323  .filter(table.sum.is_not(None))
1324  .order_by(table.start_ts.asc())
1325  .limit(1)
1326  )
1327  if start_time is not None:
1328  start_time = start_time + table.duration - timedelta.resolution
1329  if table == StatisticsShortTerm:
1330  minutes = start_time.minute - start_time.minute % 5
1331  period = start_time.replace(minute=minutes, second=0, microsecond=0)
1332  else:
1333  period = start_time.replace(minute=0, second=0, microsecond=0)
1334  prev_period = period - table.duration
1335  prev_period_ts = prev_period.timestamp()
1336  stmt += lambda q: q.filter(table.start_ts >= prev_period_ts)
1337  stats = cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
1338  return stats[0].sum if stats else None
1339 
1340  oldest_sum: float | None = None
1341 
1342  # This function won't be called if tail_only is False and main_start_time is None
1343  # the extra checks are added to satisfy MyPy
1344  if not tail_only and main_start_time is not None and oldest_stat is not None:
1345  period = main_start_time.replace(minute=0, second=0, microsecond=0)
1346  prev_period = period - Statistics.duration
1347  if prev_period < oldest_stat:
1348  return 0
1349 
1350  if (
1351  head_start_time is not None
1352  and oldest_5_min_stat is not None
1353  and (
1354  # If we want stats older than the short term purge window, don't lookup
1355  # the oldest sum in the short term table, as it would be prioritized
1356  # over older LongTermStats.
1357  (oldest_stat is None)
1358  or (oldest_5_min_stat < oldest_stat)
1359  or (oldest_5_min_stat <= head_start_time)
1360  )
1361  and (
1362  oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1363  session, head_start_time, StatisticsShortTerm, metadata_id
1364  )
1365  )
1366  is not None
1367  ):
1368  return oldest_sum
1369 
1370  if not tail_only:
1371  if (
1372  oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1373  session, main_start_time, Statistics, metadata_id
1374  )
1375  ) is not None:
1376  return oldest_sum
1377  return 0
1378 
1379  if (
1380  tail_start_time is not None
1381  and (
1382  oldest_sum := _get_oldest_sum_statistic_in_sub_period(
1383  session, tail_start_time, StatisticsShortTerm, metadata_id
1384  )
1385  )
1386  ) is not None:
1387  return oldest_sum
1388 
1389  return 0
1390 
1391 
1393  session: Session,
1394  head_start_time: datetime | None,
1395  head_end_time: datetime | None,
1396  main_start_time: datetime | None,
1397  main_end_time: datetime | None,
1398  tail_start_time: datetime | None,
1399  tail_end_time: datetime | None,
1400  tail_only: bool,
1401  metadata_id: int,
1402 ) -> float | None:
1403  """Return the newest non-NULL sum during the period."""
1404 
1405  def _get_newest_sum_statistic_in_sub_period(
1406  session: Session,
1407  start_time: datetime | None,
1408  end_time: datetime | None,
1409  table: type[StatisticsBase],
1410  metadata_id: int,
1411  ) -> float | None:
1412  """Return the newest non-NULL sum during the period."""
1413  stmt = lambda_stmt(
1414  lambda: select(
1415  table.sum,
1416  )
1417  .filter(table.metadata_id == metadata_id)
1418  .filter(table.sum.is_not(None))
1419  .order_by(table.start_ts.desc())
1420  .limit(1)
1421  )
1422  if start_time is not None:
1423  start_time_ts = start_time.timestamp()
1424  stmt += lambda q: q.filter(table.start_ts >= start_time_ts)
1425  if end_time is not None:
1426  end_time_ts = end_time.timestamp()
1427  stmt += lambda q: q.filter(table.start_ts < end_time_ts)
1428  stats = cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
1429 
1430  return stats[0].sum if stats else None
1431 
1432  newest_sum: float | None = None
1433 
1434  if tail_start_time is not None:
1435  newest_sum = _get_newest_sum_statistic_in_sub_period(
1436  session, tail_start_time, tail_end_time, StatisticsShortTerm, metadata_id
1437  )
1438  if newest_sum is not None:
1439  return newest_sum
1440 
1441  if not tail_only:
1442  newest_sum = _get_newest_sum_statistic_in_sub_period(
1443  session, main_start_time, main_end_time, Statistics, metadata_id
1444  )
1445  if newest_sum is not None:
1446  return newest_sum
1447 
1448  if head_start_time is not None:
1449  newest_sum = _get_newest_sum_statistic_in_sub_period(
1450  session, head_start_time, head_end_time, StatisticsShortTerm, metadata_id
1451  )
1452 
1453  return newest_sum
1454 
1455 
1457  hass: HomeAssistant,
1458  start_time: datetime | None,
1459  end_time: datetime | None,
1460  statistic_id: str,
1461  types: set[Literal["max", "mean", "min", "change"]] | None,
1462  units: dict[str, str] | None,
1463 ) -> dict[str, Any]:
1464  """Return a statistic data point for the UTC period start_time - end_time."""
1465  metadata = None
1466 
1467  if not types:
1468  types = {"max", "mean", "min", "change"}
1469 
1470  result: dict[str, Any] = {}
1471 
1472  with session_scope(hass=hass, read_only=True) as session:
1473  # Fetch metadata for the given statistic_id
1474  if not (
1475  metadata := get_instance(hass).statistics_meta_manager.get(
1476  session, statistic_id
1477  )
1478  ):
1479  return result
1480 
1481  metadata_id = metadata[0]
1482 
1483  oldest_stat = _first_statistic(session, Statistics, metadata_id)
1484  oldest_5_min_stat = None
1485  if not valid_statistic_id(statistic_id):
1486  oldest_5_min_stat = _first_statistic(
1487  session, StatisticsShortTerm, metadata_id
1488  )
1489 
1490  # To calculate the summary, data from the statistics (hourly) and
1491  # short_term_statistics (5 minute) tables is combined
1492  # - The short term statistics table is used for the head and tail of the period,
1493  # if the period it doesn't start or end on a full hour
1494  # - The statistics table is used for the remainder of the time
1495  now = dt_util.utcnow()
1496  if end_time is not None and end_time > now:
1497  end_time = now
1498 
1499  tail_only = (
1500  start_time is not None
1501  and end_time is not None
1502  and end_time - start_time < Statistics.duration
1503  )
1504 
1505  # Calculate the head period
1506  head_start_time: datetime | None = None
1507  head_end_time: datetime | None = None
1508  if (
1509  not tail_only
1510  and oldest_stat is not None
1511  and oldest_5_min_stat is not None
1512  and oldest_5_min_stat - oldest_stat < Statistics.duration
1513  and (start_time is None or start_time < oldest_5_min_stat)
1514  ):
1515  # To improve accuracy of averaged for statistics which were added within
1516  # recorder's retention period.
1517  head_start_time = oldest_5_min_stat
1518  head_end_time = (
1519  oldest_5_min_stat.replace(minute=0, second=0, microsecond=0)
1520  + Statistics.duration
1521  )
1522  elif not tail_only and start_time is not None and start_time.minute:
1523  head_start_time = start_time
1524  head_end_time = (
1525  start_time.replace(minute=0, second=0, microsecond=0)
1526  + Statistics.duration
1527  )
1528 
1529  # Calculate the tail period
1530  tail_start_time: datetime | None = None
1531  tail_end_time: datetime | None = None
1532  if end_time is None:
1533  tail_start_time = _last_statistic(session, Statistics, metadata_id)
1534  if tail_start_time:
1535  tail_start_time += Statistics.duration
1536  else:
1537  tail_start_time = now.replace(minute=0, second=0, microsecond=0)
1538  elif tail_only:
1539  tail_start_time = start_time
1540  tail_end_time = end_time
1541  elif end_time.minute:
1542  tail_start_time = end_time.replace(minute=0, second=0, microsecond=0)
1543  tail_end_time = end_time
1544 
1545  # Calculate the main period
1546  main_start_time: datetime | None = None
1547  main_end_time: datetime | None = None
1548  if not tail_only:
1549  main_start_time = start_time if head_end_time is None else head_end_time
1550  main_end_time = end_time if tail_start_time is None else tail_start_time
1551 
1552  if not types.isdisjoint({"max", "mean", "min"}):
1553  result = _get_max_mean_min_statistic(
1554  session,
1555  head_start_time,
1556  head_end_time,
1557  main_start_time,
1558  main_end_time,
1559  tail_start_time,
1560  tail_end_time,
1561  tail_only,
1562  metadata_id,
1563  types,
1564  )
1565 
1566  if "change" in types:
1567  oldest_sum: float | None
1568  if start_time is None:
1569  oldest_sum = 0.0
1570  else:
1571  oldest_sum = _get_oldest_sum_statistic(
1572  session,
1573  head_start_time,
1574  main_start_time,
1575  tail_start_time,
1576  oldest_stat,
1577  oldest_5_min_stat,
1578  tail_only,
1579  metadata_id,
1580  )
1581  newest_sum = _get_newest_sum_statistic(
1582  session,
1583  head_start_time,
1584  head_end_time,
1585  main_start_time,
1586  main_end_time,
1587  tail_start_time,
1588  tail_end_time,
1589  tail_only,
1590  metadata_id,
1591  )
1592  # Calculate the difference between the oldest and newest sum
1593  if oldest_sum is not None and newest_sum is not None:
1594  result["change"] = newest_sum - oldest_sum
1595  else:
1596  result["change"] = None
1597 
1598  state_unit = unit = metadata[1]["unit_of_measurement"]
1599  if state := hass.states.get(statistic_id):
1600  state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
1601  convert = _get_statistic_to_display_unit_converter(unit, state_unit, units)
1602 
1603  if not convert:
1604  return result
1605  return {key: convert(value) for key, value in result.items()}
1606 
1607 
1608 _type_column_mapping = {
1609  "last_reset": "last_reset_ts",
1610  "max": "max",
1611  "mean": "mean",
1612  "min": "min",
1613  "state": "state",
1614  "sum": "sum",
1615 }
1616 
1617 
1619  table: type[StatisticsBase],
1620  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1621 ) -> StatementLambdaElement:
1622  columns = select(table.metadata_id, table.start_ts)
1623  track_on: list[str | None] = [
1624  table.__tablename__, # type: ignore[attr-defined]
1625  ]
1626  for key, column in _type_column_mapping.items():
1627  if key in types:
1628  columns = columns.add_columns(getattr(table, column))
1629  track_on.append(column)
1630  else:
1631  track_on.append(None)
1632  return lambda_stmt(lambda: columns, track_on=track_on)
1633 
1634 
1636  metadata: dict[str, tuple[int, StatisticMetaData]],
1637  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1638 ) -> list[int]:
1639  """Extract metadata ids from metadata and discard impossible columns."""
1640  metadata_ids = []
1641  has_mean = False
1642  has_sum = False
1643  for metadata_id, stats_metadata in metadata.values():
1644  metadata_ids.append(metadata_id)
1645  has_mean |= stats_metadata["has_mean"]
1646  has_sum |= stats_metadata["has_sum"]
1647  if not has_mean:
1648  types.discard("mean")
1649  types.discard("min")
1650  types.discard("max")
1651  if not has_sum:
1652  types.discard("sum")
1653  types.discard("state")
1654  return metadata_ids
1655 
1656 
1658  hass: HomeAssistant,
1659  session: Session,
1660  start_time: datetime,
1661  units: dict[str, str] | None,
1662  _types: set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]],
1663  table: type[Statistics | StatisticsShortTerm],
1664  metadata: dict[str, tuple[int, StatisticMetaData]],
1665  result: dict[str, list[StatisticsRow]],
1666 ) -> None:
1667  """Add change to the result."""
1668  drop_sum = "sum" not in _types
1669  prev_sums = {}
1670  if tmp := _statistics_at_time(
1671  session,
1672  {metadata[statistic_id][0] for statistic_id in result},
1673  table,
1674  start_time,
1675  {"sum"},
1676  ):
1677  _metadata = dict(metadata.values())
1678  for row in tmp:
1679  metadata_by_id = _metadata[row.metadata_id]
1680  statistic_id = metadata_by_id["statistic_id"]
1681 
1682  state_unit = unit = metadata_by_id["unit_of_measurement"]
1683  if state := hass.states.get(statistic_id):
1684  state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
1685  convert = _get_statistic_to_display_unit_converter(unit, state_unit, units)
1686 
1687  if convert is not None:
1688  prev_sums[statistic_id] = convert(row.sum)
1689  else:
1690  prev_sums[statistic_id] = row.sum
1691 
1692  for statistic_id, rows in result.items():
1693  prev_sum = prev_sums.get(statistic_id) or 0
1694  for statistics_row in rows:
1695  if "sum" not in statistics_row:
1696  continue
1697  if drop_sum:
1698  _sum = statistics_row.pop("sum")
1699  else:
1700  _sum = statistics_row["sum"]
1701  if _sum is None:
1702  statistics_row["change"] = None
1703  continue
1704  statistics_row["change"] = _sum - prev_sum
1705  prev_sum = _sum
1706 
1707 
1709  hass: HomeAssistant,
1710  session: Session,
1711  start_time: datetime,
1712  end_time: datetime | None,
1713  statistic_ids: set[str] | None,
1714  period: Literal["5minute", "day", "hour", "week", "month"],
1715  units: dict[str, str] | None,
1716  _types: set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]],
1717 ) -> dict[str, list[StatisticsRow]]:
1718  """Return statistic data points during UTC period start_time - end_time.
1719 
1720  If end_time is omitted, returns statistics newer than or equal to start_time.
1721  If statistic_ids is omitted, returns statistics for all statistics ids.
1722  """
1723  if statistic_ids is not None and not isinstance(statistic_ids, set):
1724  # This is for backwards compatibility to avoid a breaking change
1725  # for custom integrations that call this method.
1726  statistic_ids = set(statistic_ids) # type: ignore[unreachable]
1727  # Fetch metadata for the given (or all) statistic_ids
1728  metadata = get_instance(hass).statistics_meta_manager.get_many(
1729  session, statistic_ids=statistic_ids
1730  )
1731  if not metadata:
1732  return {}
1733 
1734  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] = set()
1735  for stat_type in _types:
1736  if stat_type == "change":
1737  types.add("sum")
1738  continue
1739  types.add(stat_type)
1740 
1741  metadata_ids = None
1742  if statistic_ids is not None:
1743  metadata_ids = _extract_metadata_and_discard_impossible_columns(metadata, types)
1744 
1745  # Align start_time and end_time with the period
1746  if period == "day":
1747  start_time = dt_util.as_local(start_time).replace(
1748  hour=0, minute=0, second=0, microsecond=0
1749  )
1750  start_time = start_time.replace()
1751  if end_time is not None:
1752  end_local = dt_util.as_local(end_time)
1753  end_time = end_local.replace(
1754  hour=0, minute=0, second=0, microsecond=0
1755  ) + timedelta(days=1)
1756  elif period == "week":
1757  start_local = dt_util.as_local(start_time)
1758  start_time = start_local.replace(
1759  hour=0, minute=0, second=0, microsecond=0
1760  ) - timedelta(days=start_local.weekday())
1761  if end_time is not None:
1762  end_local = dt_util.as_local(end_time)
1763  end_time = (
1764  end_local.replace(hour=0, minute=0, second=0, microsecond=0)
1765  - timedelta(days=end_local.weekday())
1766  + timedelta(days=7)
1767  )
1768  elif period == "month":
1769  start_time = dt_util.as_local(start_time).replace(
1770  day=1, hour=0, minute=0, second=0, microsecond=0
1771  )
1772  if end_time is not None:
1773  end_time = _find_month_end_time(dt_util.as_local(end_time))
1774 
1775  table: type[Statistics | StatisticsShortTerm] = (
1776  Statistics if period != "5minute" else StatisticsShortTerm
1777  )
1779  start_time, end_time, metadata_ids, table, types
1780  )
1781  stats = cast(
1782  Sequence[Row], execute_stmt_lambda_element(session, stmt, orm_rows=False)
1783  )
1784 
1785  if not stats:
1786  return {}
1787 
1788  result = _sorted_statistics_to_dict(
1789  hass,
1790  stats,
1791  statistic_ids,
1792  metadata,
1793  True,
1794  table,
1795  units,
1796  types,
1797  )
1798 
1799  if period == "day":
1800  result = _reduce_statistics_per_day(result, types)
1801 
1802  if period == "week":
1803  result = _reduce_statistics_per_week(result, types)
1804 
1805  if period == "month":
1806  result = _reduce_statistics_per_month(result, types)
1807 
1808  if "change" in _types:
1810  hass, session, start_time, units, _types, table, metadata, result
1811  )
1812 
1813  # Return statistics combined with metadata
1814  return result
1815 
1816 
1818  hass: HomeAssistant,
1819  start_time: datetime,
1820  end_time: datetime | None,
1821  statistic_ids: set[str] | None,
1822  period: Literal["5minute", "day", "hour", "week", "month"],
1823  units: dict[str, str] | None,
1824  types: set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]],
1825 ) -> dict[str, list[StatisticsRow]]:
1826  """Return statistic data points during UTC period start_time - end_time.
1827 
1828  If end_time is omitted, returns statistics newer than or equal to start_time.
1829  If statistic_ids is omitted, returns statistics for all statistics ids.
1830  """
1831  with session_scope(hass=hass, read_only=True) as session:
1833  hass,
1834  session,
1835  start_time,
1836  end_time,
1837  statistic_ids,
1838  period,
1839  units,
1840  types,
1841  )
1842 
1843 
1845  metadata_id: int,
1846  number_of_stats: int,
1847 ) -> StatementLambdaElement:
1848  """Generate a statement for number_of_stats statistics for a given statistic_id."""
1849  return lambda_stmt(
1850  lambda: select(*QUERY_STATISTICS)
1851  .filter_by(metadata_id=metadata_id)
1852  .order_by(Statistics.metadata_id, Statistics.start_ts.desc())
1853  .limit(number_of_stats)
1854  )
1855 
1856 
1858  metadata_id: int,
1859  number_of_stats: int,
1860 ) -> StatementLambdaElement:
1861  """Generate a statement for number_of_stats short term statistics.
1862 
1863  For a given statistic_id.
1864  """
1865  return lambda_stmt(
1866  lambda: select(*QUERY_STATISTICS_SHORT_TERM)
1867  .filter_by(metadata_id=metadata_id)
1868  .order_by(StatisticsShortTerm.metadata_id, StatisticsShortTerm.start_ts.desc())
1869  .limit(number_of_stats)
1870  )
1871 
1872 
1874  hass: HomeAssistant,
1875  number_of_stats: int,
1876  statistic_id: str,
1877  convert_units: bool,
1878  table: type[StatisticsBase],
1879  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1880 ) -> dict[str, list[StatisticsRow]]:
1881  """Return the last number_of_stats statistics for a given statistic_id."""
1882  statistic_ids = {statistic_id}
1883  with session_scope(hass=hass, read_only=True) as session:
1884  # Fetch metadata for the given statistic_id
1885  metadata = get_instance(hass).statistics_meta_manager.get_many(
1886  session, statistic_ids=statistic_ids
1887  )
1888  if not metadata:
1889  return {}
1890  metadata_ids = _extract_metadata_and_discard_impossible_columns(metadata, types)
1891  metadata_id = metadata_ids[0]
1892  if table == Statistics:
1893  stmt = _get_last_statistics_stmt(metadata_id, number_of_stats)
1894  else:
1895  stmt = _get_last_statistics_short_term_stmt(metadata_id, number_of_stats)
1896  stats = cast(
1897  Sequence[Row], execute_stmt_lambda_element(session, stmt, orm_rows=False)
1898  )
1899 
1900  if not stats:
1901  return {}
1902 
1903  # Return statistics combined with metadata
1905  hass,
1906  stats,
1907  statistic_ids,
1908  metadata,
1909  convert_units,
1910  table,
1911  None,
1912  types,
1913  )
1914 
1915 
1917  hass: HomeAssistant,
1918  number_of_stats: int,
1919  statistic_id: str,
1920  convert_units: bool,
1921  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1922 ) -> dict[str, list[StatisticsRow]]:
1923  """Return the last number_of_stats statistics for a statistic_id."""
1924  return _get_last_statistics(
1925  hass, number_of_stats, statistic_id, convert_units, Statistics, types
1926  )
1927 
1928 
1930  hass: HomeAssistant,
1931  number_of_stats: int,
1932  statistic_id: str,
1933  convert_units: bool,
1934  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1935 ) -> dict[str, list[StatisticsRow]]:
1936  """Return the last number_of_stats short term statistics for a statistic_id."""
1937  return _get_last_statistics(
1938  hass, number_of_stats, statistic_id, convert_units, StatisticsShortTerm, types
1939  )
1940 
1941 
1943  session: Session, ids: Iterable[int]
1944 ) -> list[Row]:
1945  """Return the latest short term statistics for a list of ids."""
1947  return list(
1948  cast(
1949  Sequence[Row],
1950  execute_stmt_lambda_element(session, stmt),
1951  )
1952  )
1953 
1954 
1956  ids: Iterable[int],
1957 ) -> StatementLambdaElement:
1958  """Create the statement for finding the latest short term stat rows by id."""
1959  return lambda_stmt(
1960  lambda: select(*QUERY_STATISTICS_SHORT_TERM).filter(
1961  StatisticsShortTerm.id.in_(ids)
1962  )
1963  )
1964 
1965 
1967  hass: HomeAssistant,
1968  session: Session,
1969  statistic_ids: set[str],
1970  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
1971  metadata: dict[str, tuple[int, StatisticMetaData]] | None = None,
1972 ) -> dict[str, list[StatisticsRow]]:
1973  """Return the latest short term statistics for a list of statistic_ids with a session."""
1974  # Fetch metadata for the given statistic_ids
1975  if not metadata:
1976  metadata = get_instance(hass).statistics_meta_manager.get_many(
1977  session, statistic_ids=statistic_ids
1978  )
1979  if not metadata:
1980  return {}
1981  metadata_ids = set(
1983  )
1984  run_cache = get_short_term_statistics_run_cache(hass)
1985  # Try to find the latest short term statistics ids for the metadata_ids
1986  # from the run cache first if we have it. If the run cache references
1987  # a non-existent id because of a purge, we will detect it missing in the
1988  # next step and run a query to re-populate the cache.
1989  stats: list[Row] = []
1990  if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
1992  session, metadata_id_to_id.values()
1993  )
1994  # If we are missing some metadata_ids in the run cache, we need run a query
1995  # to populate the cache for each metadata_id, and then run another query
1996  # to get the latest short term statistics for the missing metadata_ids.
1997  if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and (
1998  found_latest_ids := {
1999  latest_id
2000  for metadata_id in missing_metadata_ids
2001  if (
2003  run_cache,
2004  session,
2005  metadata_id,
2006  )
2007  )
2008  is not None
2009  }
2010  ):
2011  stats.extend(get_latest_short_term_statistics_by_ids(session, found_latest_ids))
2012 
2013  if not stats:
2014  return {}
2015 
2016  # Return statistics combined with metadata
2018  hass,
2019  stats,
2020  statistic_ids,
2021  metadata,
2022  False,
2023  StatisticsShortTerm,
2024  None,
2025  types,
2026  )
2027 
2028 
2030  table: type[StatisticsBase],
2031  metadata_ids: set[int],
2032  start_time_ts: float,
2033  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
2034 ) -> StatementLambdaElement:
2035  """Create the statement for finding the statistics for a given time."""
2036  stmt = _generate_select_columns_for_types_stmt(table, types)
2037  stmt += lambda q: q.join(
2038  (
2039  most_recent_statistic_ids := (
2040  select(
2041  func.max(table.start_ts).label("max_start_ts"),
2042  table.metadata_id.label("max_metadata_id"),
2043  )
2044  .filter(table.start_ts < start_time_ts)
2045  .filter(table.metadata_id.in_(metadata_ids))
2046  .group_by(table.metadata_id)
2047  .subquery()
2048  )
2049  ),
2050  and_(
2051  table.start_ts == most_recent_statistic_ids.c.max_start_ts,
2052  table.metadata_id == most_recent_statistic_ids.c.max_metadata_id,
2053  ),
2054  )
2055  return stmt
2056 
2057 
2059  session: Session,
2060  metadata_ids: set[int],
2061  table: type[StatisticsBase],
2062  start_time: datetime,
2063  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
2064 ) -> Sequence[Row] | None:
2065  """Return last known statistics, earlier than start_time, for the metadata_ids."""
2066  start_time_ts = start_time.timestamp()
2067  stmt = _generate_statistics_at_time_stmt(table, metadata_ids, start_time_ts, types)
2068  return cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
2069 
2070 
2072  db_rows: list[Row],
2073  table_duration_seconds: float,
2074  start_ts_idx: int,
2075  sum_idx: int,
2076  convert: Callable[[float | None], float | None] | Callable[[float], float],
2077 ) -> list[StatisticsRow]:
2078  """Build a list of sum statistics."""
2079  return [
2080  {
2081  "start": (start_ts := db_row[start_ts_idx]),
2082  "end": start_ts + table_duration_seconds,
2083  "sum": None if (v := db_row[sum_idx]) is None else convert(v),
2084  }
2085  for db_row in db_rows
2086  ]
2087 
2088 
2090  db_rows: list[Row],
2091  table_duration_seconds: float,
2092  start_ts_idx: int,
2093  sum_idx: int,
2094 ) -> list[StatisticsRow]:
2095  """Build a list of sum statistics."""
2096  return [
2097  {
2098  "start": (start_ts := db_row[start_ts_idx]),
2099  "end": start_ts + table_duration_seconds,
2100  "sum": db_row[sum_idx],
2101  }
2102  for db_row in db_rows
2103  ]
2104 
2105 
2107  db_rows: list[Row],
2108  table_duration_seconds: float,
2109  start_ts_idx: int,
2110  row_mapping: tuple[tuple[str, int], ...],
2111 ) -> list[StatisticsRow]:
2112  """Build a list of statistics without unit conversion."""
2113  return [
2114  {
2115  "start": (start_ts := db_row[start_ts_idx]),
2116  "end": start_ts + table_duration_seconds,
2117  **{key: db_row[idx] for key, idx in row_mapping}, # type: ignore[typeddict-item]
2118  }
2119  for db_row in db_rows
2120  ]
2121 
2122 
2124  db_rows: list[Row],
2125  table_duration_seconds: float,
2126  start_ts_idx: int,
2127  row_mapping: tuple[tuple[str, int], ...],
2128  convert: Callable[[float | None], float | None] | Callable[[float], float],
2129 ) -> list[StatisticsRow]:
2130  """Build a list of statistics with unit conversion."""
2131  return [
2132  {
2133  "start": (start_ts := db_row[start_ts_idx]),
2134  "end": start_ts + table_duration_seconds,
2135  **{
2136  key: None if (v := db_row[idx]) is None else convert(v) # type: ignore[typeddict-item]
2137  for key, idx in row_mapping
2138  },
2139  }
2140  for db_row in db_rows
2141  ]
2142 
2143 
2145  hass: HomeAssistant,
2146  stats: Sequence[Row[Any]],
2147  statistic_ids: set[str] | None,
2148  _metadata: dict[str, tuple[int, StatisticMetaData]],
2149  convert_units: bool,
2150  table: type[StatisticsBase],
2151  units: dict[str, str] | None,
2152  types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
2153 ) -> dict[str, list[StatisticsRow]]:
2154  """Convert SQL results into JSON friendly data structure."""
2155  assert stats, "stats must not be empty" # Guard against implementation error
2156  result: dict[str, list[StatisticsRow]] = defaultdict(list)
2157  metadata = dict(_metadata.values())
2158  # Identify metadata IDs for which no data was available at the requested start time
2159  field_map: dict[str, int] = {key: idx for idx, key in enumerate(stats[0]._fields)}
2160  metadata_id_idx = field_map["metadata_id"]
2161  start_ts_idx = field_map["start_ts"]
2162  stats_by_meta_id: dict[int, list[Row]] = {}
2163  seen_statistic_ids: set[str] = set()
2164  key_func = itemgetter(metadata_id_idx)
2165  for meta_id, group in groupby(stats, key_func):
2166  stats_by_meta_id[meta_id] = list(group)
2167  seen_statistic_ids.add(metadata[meta_id]["statistic_id"])
2168 
2169  # Set all statistic IDs to empty lists in result set to maintain the order
2170  if statistic_ids is not None:
2171  for stat_id in statistic_ids:
2172  # Only set the statistic ID if it is in the data to
2173  # avoid having to do a second loop to remove the
2174  # statistic IDs that are not in the data at the end
2175  if stat_id in seen_statistic_ids:
2176  result[stat_id] = []
2177 
2178  # Figure out which fields we need to extract from the SQL result
2179  # and which indices they have in the result so we can avoid the overhead
2180  # of doing a dict lookup for each row
2181  if "last_reset_ts" in field_map:
2182  field_map["last_reset"] = field_map.pop("last_reset_ts")
2183  sum_idx = field_map["sum"] if "sum" in types else None
2184  sum_only = len(types) == 1 and sum_idx is not None
2185  row_mapping = tuple((key, field_map[key]) for key in types if key in field_map)
2186  # Append all statistic entries, and optionally do unit conversion
2187  table_duration_seconds = table.duration.total_seconds()
2188  for meta_id, db_rows in stats_by_meta_id.items():
2189  metadata_by_id = metadata[meta_id]
2190  statistic_id = metadata_by_id["statistic_id"]
2191  if convert_units:
2192  state_unit = unit = metadata_by_id["unit_of_measurement"]
2193  if state := hass.states.get(statistic_id):
2194  state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
2196  unit, state_unit, units, allow_none=False
2197  )
2198  else:
2199  convert = None
2200 
2201  build_args = (db_rows, table_duration_seconds, start_ts_idx)
2202  if sum_only:
2203  # This function is extremely flexible and can handle all types of
2204  # statistics, but in practice we only ever use a few combinations.
2205  #
2206  # For energy, we only need sum statistics, so we can optimize
2207  # this path to avoid the overhead of the more generic function.
2208  assert sum_idx is not None
2209  if convert:
2210  _stats = _build_sum_converted_stats(*build_args, sum_idx, convert)
2211  else:
2212  _stats = _build_sum_stats(*build_args, sum_idx)
2213  elif convert:
2214  _stats = _build_converted_stats(*build_args, row_mapping, convert)
2215  else:
2216  _stats = _build_stats(*build_args, row_mapping)
2217 
2218  result[statistic_id] = _stats
2219 
2220  return result
2221 
2222 
2223 def validate_statistics(hass: HomeAssistant) -> dict[str, list[ValidationIssue]]:
2224  """Validate statistics."""
2225  platform_validation: dict[str, list[ValidationIssue]] = {}
2226  for platform in hass.data[DOMAIN].recorder_platforms.values():
2227  if platform_validate_statistics := getattr(
2228  platform, INTEGRATION_PLATFORM_VALIDATE_STATISTICS, None
2229  ):
2230  platform_validation.update(platform_validate_statistics(hass))
2231  return platform_validation
2232 
2233 
2234 def update_statistics_issues(hass: HomeAssistant) -> None:
2235  """Update statistics issues."""
2236  with session_scope(hass=hass, read_only=True) as session:
2237  for platform in hass.data[DOMAIN].recorder_platforms.values():
2238  if platform_update_statistics_issues := getattr(
2239  platform, INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES, None
2240  ):
2241  platform_update_statistics_issues(hass, session)
2242 
2243 
2245  session: Session,
2246  table: type[StatisticsBase],
2247  metadata_id: int,
2248  start: datetime,
2249 ) -> int | None:
2250  """Return id if a statistics entry already exists."""
2251  start_ts = start.timestamp()
2252  result = (
2253  session.query(table.id)
2254  .filter((table.metadata_id == metadata_id) & (table.start_ts == start_ts))
2255  .first()
2256  )
2257  return result.id if result else None
2258 
2259 
2260 @callback
2262  hass: HomeAssistant,
2263  metadata: StatisticMetaData,
2264  statistics: Iterable[StatisticData],
2265 ) -> None:
2266  """Validate timestamps and insert an import_statistics job in the queue."""
2267  for statistic in statistics:
2268  start = statistic["start"]
2269  if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
2270  raise HomeAssistantError(
2271  "Naive timestamp: no or invalid timezone info provided"
2272  )
2273  if start.minute != 0 or start.second != 0 or start.microsecond != 0:
2274  raise HomeAssistantError(
2275  "Invalid timestamp: timestamps must be from the top of the hour (minutes and seconds = 0)"
2276  )
2277 
2278  statistic["start"] = dt_util.as_utc(start)
2279 
2280  if "last_reset" in statistic and statistic["last_reset"] is not None:
2281  last_reset = statistic["last_reset"]
2282  if (
2283  last_reset.tzinfo is None
2284  or last_reset.tzinfo.utcoffset(last_reset) is None
2285  ):
2286  raise HomeAssistantError("Naive timestamp")
2287  statistic["last_reset"] = dt_util.as_utc(last_reset)
2288 
2289  # Insert job in recorder's queue
2290  get_instance(hass).async_import_statistics(metadata, statistics, Statistics)
2291 
2292 
2293 @callback
2295  hass: HomeAssistant,
2296  metadata: StatisticMetaData,
2297  statistics: Iterable[StatisticData],
2298 ) -> None:
2299  """Import hourly statistics from an internal source.
2300 
2301  This inserts an import_statistics job in the recorder's queue.
2302  """
2303  if not valid_entity_id(metadata["statistic_id"]):
2304  raise HomeAssistantError("Invalid statistic_id")
2305 
2306  # The source must not be empty and must be aligned with the statistic_id
2307  if not metadata["source"] or metadata["source"] != DOMAIN:
2308  raise HomeAssistantError("Invalid source")
2309 
2310  _async_import_statistics(hass, metadata, statistics)
2311 
2312 
2313 @callback
2315  hass: HomeAssistant,
2316  metadata: StatisticMetaData,
2317  statistics: Iterable[StatisticData],
2318 ) -> None:
2319  """Add hourly statistics from an external source.
2320 
2321  This inserts an import_statistics job in the recorder's queue.
2322  """
2323  # The statistic_id has same limitations as an entity_id, but with a ':' as separator
2324  if not valid_statistic_id(metadata["statistic_id"]):
2325  raise HomeAssistantError("Invalid statistic_id")
2326 
2327  # The source must not be empty and must be aligned with the statistic_id
2328  domain, _object_id = split_statistic_id(metadata["statistic_id"])
2329  if not metadata["source"] or metadata["source"] != domain:
2330  raise HomeAssistantError("Invalid source")
2331 
2332  _async_import_statistics(hass, metadata, statistics)
2333 
2334 
2336  instance: Recorder,
2337  session: Session,
2338  metadata: StatisticMetaData,
2339  statistics: Iterable[StatisticData],
2340  table: type[StatisticsBase],
2341 ) -> bool:
2342  """Import statistics to the database."""
2343  statistics_meta_manager = instance.statistics_meta_manager
2344  old_metadata_dict = statistics_meta_manager.get_many(
2345  session, statistic_ids={metadata["statistic_id"]}
2346  )
2347  _, metadata_id = statistics_meta_manager.update_or_add(
2348  session, metadata, old_metadata_dict
2349  )
2350  for stat in statistics:
2351  if stat_id := _statistics_exists(session, table, metadata_id, stat["start"]):
2352  _update_statistics(session, table, stat_id, stat)
2353  else:
2354  _insert_statistics(session, table, metadata_id, stat)
2355 
2356  if table != StatisticsShortTerm:
2357  return True
2358 
2359  # We just inserted new short term statistics, so we need to update the
2360  # ShortTermStatisticsRunCache with the latest id for the metadata_id
2361  run_cache = get_short_term_statistics_run_cache(instance.hass)
2363  run_cache, session, metadata_id
2364  )
2365 
2366  return True
2367 
2368 
2369 @singleton(DATA_SHORT_TERM_STATISTICS_RUN_CACHE)
2371  hass: HomeAssistant,
2372 ) -> ShortTermStatisticsRunCache:
2373  """Get the short term statistics run cache."""
2375 
2376 
2378  run_cache: ShortTermStatisticsRunCache,
2379  session: Session,
2380  metadata_id: int,
2381 ) -> int | None:
2382  """Cache the latest short term statistic for a given metadata_id.
2383 
2384  Returns the id of the latest short term statistic for the metadata_id
2385  that was added to the cache, or None if no latest short term statistic
2386  was found for the metadata_id.
2387  """
2388  if latest := cast(
2389  Sequence[Row],
2392  ),
2393  ):
2394  id_: int = latest[0].id
2395  run_cache.set_latest_id_for_metadata_id(metadata_id, id_)
2396  return id_
2397  return None
2398 
2399 
2401  metadata_id: int,
2402 ) -> StatementLambdaElement:
2403  """Create a statement to find the latest short term statistics for a metadata_id."""
2404  #
2405  # This code only looks up one row, and should not be refactored to
2406  # lookup multiple using func.max
2407  # or similar, as that will cause the query to be significantly slower
2408  # for DBMs such as PostgreSQL that will have to do a full scan
2409  #
2410  # For PostgreSQL a combined query plan looks like:
2411  # (actual time=2.218..893.909 rows=170531 loops=1)
2412  #
2413  # For PostgreSQL a separate query plan looks like:
2414  # (actual time=0.301..0.301 rows=1 loops=1)
2415  #
2416  #
2417  return lambda_stmt(
2418  lambda: select(
2419  StatisticsShortTerm.id,
2420  )
2421  .where(StatisticsShortTerm.metadata_id == metadata_id)
2422  .order_by(StatisticsShortTerm.start_ts.desc())
2423  .limit(1)
2424  )
2425 
2426 
2427 @retryable_database_job("statistics")
2429  instance: Recorder,
2430  metadata: StatisticMetaData,
2431  statistics: Iterable[StatisticData],
2432  table: type[StatisticsBase],
2433 ) -> bool:
2434  """Process an import_statistics job."""
2435 
2436  with session_scope(
2437  session=instance.get_session(),
2439  instance, "statistic"
2440  ),
2441  ) as session:
2443  instance, session, metadata, statistics, table
2444  )
2445 
2446 
2447 @retryable_database_job("adjust_statistics")
2449  instance: Recorder,
2450  statistic_id: str,
2451  start_time: datetime,
2452  sum_adjustment: float,
2453  adjustment_unit: str,
2454 ) -> bool:
2455  """Process an add_statistics job."""
2456 
2457  with session_scope(session=instance.get_session()) as session:
2458  metadata = instance.statistics_meta_manager.get_many(
2459  session, statistic_ids={statistic_id}
2460  )
2461  if statistic_id not in metadata:
2462  return True
2463 
2464  statistic_unit = metadata[statistic_id][1]["unit_of_measurement"]
2466  adjustment_unit, statistic_unit
2467  ):
2468  sum_adjustment = convert(sum_adjustment)
2469 
2471  session,
2472  StatisticsShortTerm,
2473  metadata[statistic_id][0],
2474  start_time,
2475  sum_adjustment,
2476  )
2477 
2479  session,
2480  Statistics,
2481  metadata[statistic_id][0],
2482  start_time.replace(minute=0),
2483  sum_adjustment,
2484  )
2485 
2486  return True
2487 
2488 
2490  session: Session,
2491  table: type[StatisticsBase],
2492  metadata_id: int,
2493  convert: Callable[[float | None], float | None],
2494 ) -> None:
2495  """Insert statistics in the database."""
2496  columns = (table.id, table.mean, table.min, table.max, table.state, table.sum)
2497  query = session.query(*columns).filter_by(metadata_id=bindparam("metadata_id"))
2498  rows = execute(query.params(metadata_id=metadata_id))
2499  for row in rows:
2500  session.query(table).filter(table.id == row.id).update(
2501  {
2502  table.mean: convert(row.mean),
2503  table.min: convert(row.min),
2504  table.max: convert(row.max),
2505  table.state: convert(row.state),
2506  table.sum: convert(row.sum),
2507  },
2508  synchronize_session=False,
2509  )
2510 
2511 
2513  instance: Recorder,
2514  statistic_id: str,
2515  new_unit: str,
2516  old_unit: str,
2517 ) -> None:
2518  """Change statistics unit for a statistic_id."""
2519  statistics_meta_manager = instance.statistics_meta_manager
2520  with session_scope(session=instance.get_session()) as session:
2521  metadata = statistics_meta_manager.get(session, statistic_id)
2522 
2523  # Guard against the statistics being removed or updated before the
2524  # change_statistics_unit job executes
2525  if (
2526  metadata is None
2527  or metadata[1]["source"] != DOMAIN
2528  or metadata[1]["unit_of_measurement"] != old_unit
2529  ):
2530  _LOGGER.warning("Could not change statistics unit for %s", statistic_id)
2531  return
2532 
2533  metadata_id = metadata[0]
2534 
2535  if not (convert := _get_unit_converter(old_unit, new_unit)):
2536  _LOGGER.warning(
2537  "Statistics unit of measurement for %s is already %s",
2538  statistic_id,
2539  new_unit,
2540  )
2541  return
2542 
2543  tables: tuple[type[StatisticsBase], ...] = (
2544  Statistics,
2545  StatisticsShortTerm,
2546  )
2547  for table in tables:
2548  _change_statistics_unit_for_table(session, table, metadata_id, convert)
2549 
2550  statistics_meta_manager.update_unit_of_measurement(
2551  session, statistic_id, new_unit
2552  )
2553 
2554 
2555 @callback
2557  hass: HomeAssistant,
2558  statistic_id: str,
2559  *,
2560  new_unit_of_measurement: str,
2561  old_unit_of_measurement: str,
2562 ) -> None:
2563  """Change statistics unit for a statistic_id."""
2564  if not can_convert_units(old_unit_of_measurement, new_unit_of_measurement):
2565  raise HomeAssistantError(
2566  f"Can't convert {old_unit_of_measurement} to {new_unit_of_measurement}"
2567  )
2568 
2570  statistic_id,
2571  new_unit_of_measurement=new_unit_of_measurement,
2572  old_unit_of_measurement=old_unit_of_measurement,
2573  )
2574 
2575 
2576 def cleanup_statistics_timestamp_migration(instance: Recorder) -> bool:
2577  """Clean up the statistics migration from timestamp to datetime.
2578 
2579  Returns False if there are more rows to update.
2580  Returns True if all rows have been updated.
2581  """
2582  engine = instance.engine
2583  assert engine is not None
2584  if engine.dialect.name == SupportedDialect.SQLITE:
2585  for table in STATISTICS_TABLES:
2586  with session_scope(session=instance.get_session()) as session:
2587  session.connection().execute(
2588  text(
2589  f"update {table} set start = NULL, created = NULL, last_reset = NULL;" # noqa: S608
2590  )
2591  )
2592  elif engine.dialect.name == SupportedDialect.MYSQL:
2593  for table in STATISTICS_TABLES:
2594  with session_scope(session=instance.get_session()) as session:
2595  if (
2596  session.connection()
2597  .execute(
2598  text(
2599  f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL where start is not NULL LIMIT 100000;" # noqa: S608
2600  )
2601  )
2602  .rowcount
2603  ):
2604  # We have more rows to update so return False
2605  # to indicate we need to run again
2606  return False
2607  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2608  for table in STATISTICS_TABLES:
2609  with session_scope(session=instance.get_session()) as session:
2610  if (
2611  session.connection()
2612  .execute(
2613  text(
2614  f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL " # noqa: S608
2615  f"where id in (select id from {table} where start is not NULL LIMIT 100000)"
2616  )
2617  )
2618  .rowcount
2619  ):
2620  # We have more rows to update so return False
2621  # to indicate we need to run again
2622  return False
2623 
2624  from .migration import _drop_index # pylint: disable=import-outside-toplevel
2625 
2626  for table in STATISTICS_TABLES:
2627  _drop_index(instance.get_session, table, f"ix_{table}_start")
2628  # We have no more rows to update so return True
2629  # to indicate we are done
2630  return True
None set_latest_ids_for_metadata_ids(self, dict[int, int] metadata_id_to_id)
Definition: statistics.py:204
dict[int, int] get_latest_ids(self, set[int] metadata_ids)
Definition: statistics.py:190
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
def execute(hass, filename, source, data=None, return_response=False)
Definition: __init__.py:194
None _drop_index(Callable[[], Session] session_maker, str table_name, str index_name, bool|None quiet=None)
Definition: migration.py:484
float|None datetime_to_timestamp_or_none(datetime|None dt)
Definition: time.py:55
None change_statistics_unit(Recorder instance, str statistic_id, str new_unit, str old_unit)
Definition: statistics.py:2517
StatementLambdaElement _get_last_statistics_stmt(int metadata_id, int number_of_stats)
Definition: statistics.py:1847
None update_statistics_issues(HomeAssistant hass)
Definition: statistics.py:2234
StatementLambdaElement _get_last_statistics_short_term_stmt(int metadata_id, int number_of_stats)
Definition: statistics.py:1860
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_week_ts_factory()
Definition: statistics.py:1023
list[StatisticsRow] _build_sum_converted_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, int sum_idx, Callable[[float|None], float|None]|Callable[[float], float] convert)
Definition: statistics.py:2077
dict[str, list[StatisticsRow]] statistics_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1825
StatementLambdaElement _generate_select_columns_for_types_stmt(type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1621
None _compile_hourly_statistics(Session session, datetime start)
Definition: statistics.py:398
datetime|None _last_statistic(Session session, type[StatisticsBase] table, int metadata_id)
Definition: statistics.py:1288
dict[str, tuple[int, StatisticMetaData]] get_metadata(HomeAssistant hass, *set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
Definition: statistics.py:741
dict[str, list[ValidationIssue]] validate_statistics(HomeAssistant hass)
Definition: statistics.py:2223
dict[str, list[StatisticsRow]] _reduce_statistics_per_week(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1060
list[dict] _flatten_list_statistic_ids_metadata_result(dict[str, dict[str, Any]] result)
Definition: statistics.py:837
str|None get_display_unit(HomeAssistant hass, str statistic_id, str|None statistic_unit)
Definition: statistics.py:232
None clear_statistics(Recorder instance, list[str] statistic_ids)
Definition: statistics.py:753
dict[str, Any] statistic_during_period(HomeAssistant hass, datetime|None start_time, datetime|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str]|None units)
Definition: statistics.py:1463
bool _import_statistics_with_session(Recorder instance, Session session, StatisticMetaData metadata, Iterable[StatisticData] statistics, type[StatisticsBase] table)
Definition: statistics.py:2341
bool compile_statistics(Recorder instance, datetime start, bool fire_events)
Definition: statistics.py:497
dict[str, list[StatisticsRow]] _reduce_statistics_per_day(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1010
Callable[[float|None], float|None]|Callable[[float], float]|None _get_statistic_to_display_unit_converter(str|None statistic_unit, str|None state_unit, dict[str, str]|None requested_units, bool allow_none=True)
Definition: statistics.py:254
Sequence[Row]|None _statistics_at_time(Session session, set[int] metadata_ids, type[StatisticsBase] table, datetime start_time, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:2064
Callable[[float], float]|None _get_display_to_statistic_unit_converter(str|None display_unit, str|None statistic_unit)
Definition: statistics.py:283
StatementLambdaElement _find_latest_short_term_statistic_for_metadata_id_stmt(int metadata_id)
Definition: statistics.py:2402
StatementLambdaElement _compile_hourly_statistics_last_sum_stmt(float start_time_ts, float end_time_ts)
Definition: statistics.py:382
None async_change_statistics_unit(HomeAssistant hass, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
Definition: statistics.py:2562
StatementLambdaElement _generate_statistics_during_period_stmt(datetime start_time, datetime|None end_time, list[int]|None metadata_ids, type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1129
dict[str, list[StatisticsRow]] get_last_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1922
dict[str, list[StatisticsRow]] _reduce_statistics(dict[str, list[StatisticsRow]] stats, Callable[[float, float], bool] same_period, Callable[[float], tuple[float, float]] period_start_end, timedelta period, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:916
float|None _get_oldest_sum_statistic(Session session, datetime|None head_start_time, datetime|None main_start_time, datetime|None tail_start_time, datetime|None oldest_stat, datetime|None oldest_5_min_stat, bool tail_only, int metadata_id)
Definition: statistics.py:1310
StatementLambdaElement _latest_short_term_statistics_by_ids_stmt(Iterable[int] ids)
Definition: statistics.py:1957
StatementLambdaElement _generate_max_mean_min_statistic_in_sub_period_stmt(Select columns, datetime|None start_time, datetime|None end_time, type[StatisticsBase] table, int metadata_id)
Definition: statistics.py:1152
StatementLambdaElement _get_first_id_stmt(datetime start)
Definition: statistics.py:530
int|None cache_latest_short_term_statistic_id_for_metadata_id(ShortTermStatisticsRunCache run_cache, Session session, int metadata_id)
Definition: statistics.py:2381
None update_statistics_metadata(Recorder instance, str statistic_id, str|None|UndefinedType new_statistic_id, str|None|UndefinedType new_unit_of_measurement)
Definition: statistics.py:764
datetime|None _first_statistic(Session session, type[StatisticsBase] table, int metadata_id)
Definition: statistics.py:1271
Callable[[float|None], float|None]|None _get_unit_converter(str from_unit, str to_unit)
Definition: statistics.py:295
float|None mean(list[float] values)
Definition: statistics.py:168
StatisticsBase|None _insert_statistics(Session session, type[StatisticsBase] table, int metadata_id, StatisticData statistic)
Definition: statistics.py:669
None _async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2265
float|None _get_newest_sum_statistic(Session session, datetime|None head_start_time, datetime|None head_end_time, datetime|None main_start_time, datetime|None main_end_time, datetime|None tail_start_time, datetime|None tail_end_time, bool tail_only, int metadata_id)
Definition: statistics.py:1402
bool compile_missing_statistics(Recorder instance)
Definition: statistics.py:456
dict[str, list[StatisticsRow]] _sorted_statistics_to_dict(HomeAssistant hass, Sequence[Row[Any]] stats, set[str]|None statistic_ids, dict[str, tuple[int, StatisticMetaData]] _metadata, bool convert_units, type[StatisticsBase] table, dict[str, str]|None units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:2153
list[StatisticsRow] _build_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, tuple[tuple[str, int],...] row_mapping)
Definition: statistics.py:2111
bool cleanup_statistics_timestamp_migration(Recorder instance)
Definition: statistics.py:2576
None _update_statistics(Session session, type[StatisticsBase] table, int stat_id, StatisticData statistic)
Definition: statistics.py:689
dict[str, list[StatisticsRow]] get_last_short_term_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1935
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_day_ts_factory()
Definition: statistics.py:974
list[dict] list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
Definition: statistics.py:858
None _get_max_mean_min_statistic_in_sub_period(Session session, dict[str, float] result, datetime|None start_time, datetime|None end_time, type[StatisticsBase] table, set[Literal["max", "mean", "min", "change"]] types, int metadata_id)
Definition: statistics.py:1171
dict[str, tuple[int, StatisticMetaData]] get_metadata_with_session(Recorder instance, Session session, *set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
Definition: statistics.py:720
None _augment_result_with_change(HomeAssistant hass, Session session, datetime start_time, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] _types, type[Statistics|StatisticsShortTerm] table, dict[str, tuple[int, StatisticMetaData]] metadata, dict[str, list[StatisticsRow]] result)
Definition: statistics.py:1666
list[StatisticsRow] _build_sum_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, int sum_idx)
Definition: statistics.py:2094
dict[str, list[StatisticsRow]] _statistics_during_period_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] _types)
Definition: statistics.py:1717
None async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2298
list[Row] get_latest_short_term_statistics_by_ids(Session session, Iterable[int] ids)
Definition: statistics.py:1944
bool can_convert_units(str|None from_unit, str|None to_unit)
Definition: statistics.py:307
StatementLambdaElement _compile_hourly_statistics_summary_mean_stmt(float start_time_ts, float end_time_ts)
Definition: statistics.py:369
None _adjust_sum_statistics(Session session, type[StatisticsBase] table, int metadata_id, datetime start_time, float adj)
Definition: statistics.py:645
bool adjust_statistics(Recorder instance, str statistic_id, datetime start_time, float sum_adjustment, str adjustment_unit)
Definition: statistics.py:2454
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
int|None _statistics_exists(Session session, type[StatisticsBase] table, int metadata_id, datetime start)
Definition: statistics.py:2249
ShortTermStatisticsRunCache get_short_term_statistics_run_cache(HomeAssistant hass)
Definition: statistics.py:2372
dict[str, list[StatisticsRow]] _get_last_statistics(HomeAssistant hass, int number_of_stats, str statistic_id, bool convert_units, type[StatisticsBase] table, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1880
StatementLambdaElement _generate_statistics_at_time_stmt(type[StatisticsBase] table, set[int] metadata_ids, float start_time_ts, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:2034
datetime _find_month_end_time(datetime timestamp)
Definition: statistics.py:1068
( tuple[ Callable[[float, float], bool], Callable[[float], tuple[float, float]],]) reduce_month_ts_factory()
Definition: statistics.py:1081
dict[str, list[StatisticsRow]] get_latest_short_term_statistics_with_session(HomeAssistant hass, Session session, set[str] statistic_ids, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types, dict[str, tuple[int, StatisticMetaData]]|None metadata=None)
Definition: statistics.py:1972
set[str] _compile_statistics(Recorder instance, Session session, datetime start, bool fire_events)
Definition: statistics.py:537
list[dict] async_list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
Definition: statistics.py:788
dict[str, dict[str, Any]] _statistic_by_id_from_metadata(HomeAssistant hass, dict[str, tuple[int, StatisticMetaData]] metadata)
Definition: statistics.py:817
dict[str, list[StatisticsRow]] _reduce_statistics_per_month(dict[str, list[StatisticsRow]] stats, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1115
list[int] _extract_metadata_and_discard_impossible_columns(dict[str, tuple[int, StatisticMetaData]] metadata, set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1638
list[str] split_statistic_id(str entity_id)
Definition: statistics.py:323
dict[str, float|None] _get_max_mean_min_statistic(Session session, datetime|None head_start_time, datetime|None head_end_time, datetime|None main_start_time, datetime|None main_end_time, datetime|None tail_start_time, datetime|None tail_end_time, bool tail_only, int metadata_id, set[Literal["max", "mean", "min", "change"]] types)
Definition: statistics.py:1212
list[StatisticsRow] _build_converted_stats(list[Row] db_rows, float table_duration_seconds, int start_ts_idx, tuple[tuple[str, int],...] row_mapping, Callable[[float|None], float|None]|Callable[[float], float] convert)
Definition: statistics.py:2129
bool import_statistics(Recorder instance, StatisticMetaData metadata, Iterable[StatisticData] statistics, type[StatisticsBase] table)
Definition: statistics.py:2433
None _change_statistics_unit_for_table(Session session, type[StatisticsBase] table, int metadata_id, Callable[[float|None], float|None] convert)
Definition: statistics.py:2494
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
Callable[[Exception], bool] filter_unique_constraint_integrity_error(Recorder instance, str row_type)
Definition: util.py:948
bool valid_entity_id(str entity_id)
Definition: core.py:235
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86