Home Assistant Unofficial Reference 2024.12.1
states.py
Go to the documentation of this file.
1 """Support managing States."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Sequence
6 from typing import Any, cast
7 
8 from sqlalchemy.engine.row import Row
9 from sqlalchemy.orm.session import Session
10 
11 from ..db_schema import States
12 from ..queries import find_oldest_state
13 from ..util import execute_stmt_lambda_element
14 
15 
17  """Manage the states table."""
18 
19  def __init__(self) -> None:
20  """Initialize the states manager for linking old_state_id."""
21  self._pending: dict[str, States] = {}
22  self._last_committed_id: dict[str, int] = {}
23  self._last_reported: dict[int, float] = {}
24  self._oldest_ts_oldest_ts: float | None = None
25 
26  @property
27  def oldest_ts(self) -> float | None:
28  """Return the oldest timestamp."""
29  return self._oldest_ts_oldest_ts
30 
31  def pop_pending(self, entity_id: str) -> States | None:
32  """Pop a pending state.
33 
34  Pending states are states that are in the session but not yet committed.
35 
36  This call is not thread-safe and must be called from the
37  recorder thread.
38  """
39  return self._pending.pop(entity_id, None)
40 
41  def pop_committed(self, entity_id: str) -> int | None:
42  """Pop a committed state.
43 
44  Committed states are states that have already been committed to the
45  database.
46 
47  This call is not thread-safe and must be called from the
48  recorder thread.
49  """
50  return self._last_committed_id.pop(entity_id, None)
51 
52  def add_pending(self, entity_id: str, state: States) -> None:
53  """Add a pending state.
54 
55  Pending states are states that are in the session but not yet committed.
56 
57  This call is not thread-safe and must be called from the
58  recorder thread.
59  """
60  self._pending[entity_id] = state
61  if self._oldest_ts_oldest_ts is None:
62  self._oldest_ts_oldest_ts = state.last_updated_ts
63 
65  self, state_id: int, last_reported_timestamp: float
66  ) -> None:
67  """Update the last reported timestamp for a state."""
68  self._last_reported[state_id] = last_reported_timestamp
69 
70  def get_pending_last_reported_timestamp(self) -> dict[int, float]:
71  """Return the last reported timestamp for all pending states."""
72  return self._last_reported
73 
74  def post_commit_pending(self) -> None:
75  """Call after commit to load the state_id of the new States into committed.
76 
77  This call is not thread-safe and must be called from the
78  recorder thread.
79  """
80  for entity_id, db_states in self._pending.items():
81  self._last_committed_id[entity_id] = db_states.state_id
82  self._pending.clear()
83  self._last_reported.clear()
84 
85  def reset(self) -> None:
86  """Reset after the database has been reset or changed.
87 
88  This call is not thread-safe and must be called from the
89  recorder thread.
90  """
91  self._last_committed_id.clear()
92  self._pending.clear()
93  self._oldest_ts_oldest_ts = None
94 
95  def load_from_db(self, session: Session) -> None:
96  """Update the cache.
97 
98  Must run in the recorder thread.
99  """
100  result = cast(
101  Sequence[Row[Any]],
103  )
104  if not result:
105  ts = None
106  else:
107  ts = result[0].last_updated_ts
108  self._oldest_ts_oldest_ts = ts
109 
110  def evict_purged_state_ids(self, purged_state_ids: set[int]) -> None:
111  """Evict purged states from the committed states.
112 
113  When we purge states we need to make sure the next call to record a state
114  does not link the old_state_id to the purged state.
115  """
116  # Make a map from the committed state_id to the entity_id
117  last_committed_ids = self._last_committed_id
118  last_committed_ids_reversed = {
119  state_id: entity_id for entity_id, state_id in last_committed_ids.items()
120  }
121 
122  # Evict any purged state from the old states cache
123  for purged_state_id in purged_state_ids.intersection(
124  last_committed_ids_reversed
125  ):
126  last_committed_ids.pop(last_committed_ids_reversed[purged_state_id], None)
127 
128  def evict_purged_entity_ids(self, purged_entity_ids: set[str]) -> None:
129  """Evict purged entity_ids from the committed states.
130 
131  When we purge states we need to make sure the next call to record a state
132  does not link the old_state_id to the purged state.
133  """
134  last_committed_ids = self._last_committed_id
135  for entity_id in purged_entity_ids:
136  last_committed_ids.pop(entity_id, None)
None add_pending(self, str entity_id, States state)
Definition: states.py:52
None evict_purged_entity_ids(self, set[str] purged_entity_ids)
Definition: states.py:128
None update_pending_last_reported(self, int state_id, float last_reported_timestamp)
Definition: states.py:66
None evict_purged_state_ids(self, set[int] purged_state_ids)
Definition: states.py:110
StatementLambdaElement find_oldest_state()
Definition: queries.py:640
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179