1 """Support managing States."""
3 from __future__
import annotations
5 from collections.abc
import Sequence
6 from typing
import Any, cast
8 from sqlalchemy.engine.row
import Row
9 from sqlalchemy.orm.session
import Session
11 from ..db_schema
import States
12 from ..queries
import find_oldest_state
13 from ..util
import execute_stmt_lambda_element
17 """Manage the states table."""
20 """Initialize the states manager for linking old_state_id."""
21 self._pending: dict[str, States] = {}
22 self._last_committed_id: dict[str, int] = {}
23 self._last_reported: dict[int, float] = {}
28 """Return the oldest timestamp."""
32 """Pop a pending state.
34 Pending states are states that are in the session but not yet committed.
36 This call is not thread-safe and must be called from the
39 return self._pending.pop(entity_id,
None)
42 """Pop a committed state.
44 Committed states are states that have already been committed to the
47 This call is not thread-safe and must be called from the
50 return self._last_committed_id.pop(entity_id,
None)
53 """Add a pending state.
55 Pending states are states that are in the session but not yet committed.
57 This call is not thread-safe and must be called from the
60 self._pending[entity_id] = state
65 self, state_id: int, last_reported_timestamp: float
67 """Update the last reported timestamp for a state."""
68 self._last_reported[state_id] = last_reported_timestamp
71 """Return the last reported timestamp for all pending states."""
72 return self._last_reported
75 """Call after commit to load the state_id of the new States into committed.
77 This call is not thread-safe and must be called from the
80 for entity_id, db_states
in self._pending.items():
81 self._last_committed_id[entity_id] = db_states.state_id
83 self._last_reported.clear()
86 """Reset after the database has been reset or changed.
88 This call is not thread-safe and must be called from the
91 self._last_committed_id.clear()
98 Must run in the recorder thread.
107 ts = result[0].last_updated_ts
111 """Evict purged states from the committed states.
113 When we purge states we need to make sure the next call to record a state
114 does not link the old_state_id to the purged state.
117 last_committed_ids = self._last_committed_id
118 last_committed_ids_reversed = {
119 state_id: entity_id
for entity_id, state_id
in last_committed_ids.items()
123 for purged_state_id
in purged_state_ids.intersection(
124 last_committed_ids_reversed
126 last_committed_ids.pop(last_committed_ids_reversed[purged_state_id],
None)
129 """Evict purged entity_ids from the committed states.
131 When we purge states we need to make sure the next call to record a state
132 does not link the old_state_id to the purged state.
134 last_committed_ids = self._last_committed_id
135 for entity_id
in purged_entity_ids:
136 last_committed_ids.pop(entity_id,
None)
None add_pending(self, str entity_id, States state)
None evict_purged_entity_ids(self, set[str] purged_entity_ids)
float|None oldest_ts(self)
int|None pop_committed(self, str entity_id)
None load_from_db(self, Session session)
dict[int, float] get_pending_last_reported_timestamp(self)
None post_commit_pending(self)
States|None pop_pending(self, str entity_id)
None update_pending_last_reported(self, int state_id, float last_reported_timestamp)
None evict_purged_state_ids(self, set[int] purged_state_ids)
StatementLambdaElement find_oldest_state()
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)