Home Assistant Unofficial Reference 2024.12.1
recorder_runs.py
Go to the documentation of this file.
1 """Track recorder run history."""
2 
3 from __future__ import annotations
4 
5 import bisect
6 from dataclasses import dataclass
7 from datetime import datetime
8 
9 from sqlalchemy.orm.session import Session
10 
11 import homeassistant.util.dt as dt_util
12 
13 from ..db_schema import RecorderRuns
14 from ..models import process_timestamp
15 
16 
18  run_history: _RecorderRunsHistory, start: datetime
19 ) -> RecorderRuns | None:
20  """Find the recorder run for a start time in _RecorderRunsHistory."""
21  run_timestamps = run_history.run_timestamps
22  runs_by_timestamp = run_history.runs_by_timestamp
23 
24  # bisect_left tells us were we would insert
25  # a value in the list of runs after the start timestamp.
26  #
27  # The run before that (idx-1) is when the run started
28  #
29  # If idx is 0, history never ran before the start timestamp
30  #
31  if idx := bisect.bisect_left(run_timestamps, start.timestamp()):
32  return runs_by_timestamp[run_timestamps[idx - 1]]
33  return None
34 
35 
36 @dataclass(frozen=True)
38  """Bisectable history of RecorderRuns."""
39 
40  run_timestamps: list[int]
41  runs_by_timestamp: dict[int, RecorderRuns]
42 
43 
45  """Track recorder run history."""
46 
47  def __init__(self) -> None:
48  """Track recorder run history."""
49  self._recording_start_recording_start = dt_util.utcnow()
50  self._current_run_info_current_run_info: RecorderRuns | None = None
51  self._run_history_run_history = _RecorderRunsHistory([], {})
52 
53  @property
54  def recording_start(self) -> datetime:
55  """Return the time the recorder started recording states."""
56  return self._recording_start_recording_start
57 
58  @property
59  def first(self) -> RecorderRuns:
60  """Get the first run."""
61  if runs_by_timestamp := self._run_history_run_history.runs_by_timestamp:
62  return next(iter(runs_by_timestamp.values()))
63  return self.currentcurrent
64 
65  @property
66  def current(self) -> RecorderRuns:
67  """Get the current run."""
68  # If start has not been called yet because the recorder is
69  # still starting up we want history to use the current time
70  # as the created time to ensure we can still return results
71  # and we do not try to pull data from the previous run.
72  return self._current_run_info_current_run_info or RecorderRuns(
73  start=self.recording_startrecording_start, created=dt_util.utcnow()
74  )
75 
76  @property
77  def active(self) -> bool:
78  """Return if a run is active."""
79  return self._current_run_info_current_run_info is not None
80 
81  def get(self, start: datetime) -> RecorderRuns | None:
82  """Return the recorder run that started before or at start.
83 
84  If the first run started after the start, return None
85  """
86  if start >= self.recording_startrecording_start:
87  return self.currentcurrent
88  return _find_recorder_run_for_start_time(self._run_history_run_history, start)
89 
90  def start(self, session: Session) -> None:
91  """Start a new run.
92 
93  Must run in the recorder thread.
94  """
95  self._current_run_info_current_run_info = RecorderRuns(
96  start=self.recording_startrecording_start, created=dt_util.utcnow()
97  )
98  session.add(self._current_run_info_current_run_info)
99  session.flush()
100  session.expunge(self._current_run_info_current_run_info)
101  self.load_from_dbload_from_db(session)
102 
103  def reset(self) -> None:
104  """Reset the run when the database is changed or fails.
105 
106  Must run in the recorder thread.
107  """
108  self._recording_start_recording_start = dt_util.utcnow()
109  self._current_run_info_current_run_info = None
110 
111  def end(self, session: Session) -> None:
112  """End the current run.
113 
114  Must run in the recorder thread.
115  """
116  assert self._current_run_info_current_run_info is not None
117  self._current_run_info_current_run_info.end = dt_util.utcnow()
118  session.add(self._current_run_info_current_run_info)
119 
120  def load_from_db(self, session: Session) -> None:
121  """Update the run cache.
122 
123  Must run in the recorder thread.
124  """
125  run_timestamps: list[int] = []
126  runs_by_timestamp: dict[int, RecorderRuns] = {}
127 
128  for run in session.query(RecorderRuns).order_by(RecorderRuns.start.asc()).all():
129  session.expunge(run)
130  if run_dt := process_timestamp(run.start):
131  # Not sure if this is correct or runs_by_timestamp annotation should be changed
132  timestamp = int(run_dt.timestamp())
133  run_timestamps.append(timestamp)
134  runs_by_timestamp[timestamp] = run
135 
136  #
137  # self._run_history is accessed in get()
138  # which is allowed to be called from any thread
139  #
140  # We use a dataclass to ensure that when we update
141  # run_timestamps and runs_by_timestamp
142  # are never out of sync with each other.
143  #
144  self._run_history_run_history = _RecorderRunsHistory(run_timestamps, runs_by_timestamp)
145 
146  def clear(self) -> None:
147  """Clear the current run after ending it.
148 
149  Must run in the recorder thread.
150  """
151  if self._current_run_info_current_run_info:
152  self._current_run_info_current_run_info = None
RecorderRuns|None _find_recorder_run_for_start_time(_RecorderRunsHistory run_history, datetime start)