1 """Support managing StateAttributes."""
3 from __future__
import annotations
5 from collections.abc
import Collection, Iterable
7 from typing
import TYPE_CHECKING, cast
9 from sqlalchemy.orm.session
import Session
15 from ..db_schema
import StateAttributes
16 from ..queries
import get_shared_attributes
17 from ..util
import execute_stmt_lambda_element
18 from .
import BaseLRUTableManager
21 from ..core
import Recorder
31 _LOGGER = logging.getLogger(__name__)
35 """Manage the StateAttributes table."""
37 def __init__(self, recorder: Recorder) ->
None:
38 """Initialize the event type manager."""
39 super().
__init__(recorder, CACHE_SIZE)
42 """Serialize event data."""
44 return StateAttributes.shared_attrs_bytes_from_event(
45 event, self.recorder.dialect_name
47 except JSON_ENCODE_EXCEPTIONS
as ex:
49 "State is not JSON serializable: %s: %s",
50 event.data[
"new_state"],
56 self, events: list[Event[EventStateChangedData]], session: Session
58 """Load the shared_attrs to attributes_ids mapping into memory from events.
60 This call is not thread-safe and must be called from the
64 StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes)
70 def get(self, shared_attr: str, data_hash: int, session: Session) -> int |
None:
71 """Resolve shared_attrs to the attributes_id.
73 This call is not thread-safe and must be called from the
76 return self.
get_manyget_many(((shared_attr, data_hash),), session)[shared_attr]
79 self, shared_attrs_data_hashes: Iterable[tuple[str, int]], session: Session
80 ) -> dict[str, int |
None]:
81 """Resolve shared_attrs to attributes_ids.
83 This call is not thread-safe and must be called from the
86 results: dict[str, int |
None] = {}
87 missing_hashes: set[int] = set()
88 for shared_attrs, data_hash
in shared_attrs_data_hashes:
89 if (attributes_id := self._id_map.
get(shared_attrs))
is None:
90 missing_hashes.add(data_hash)
92 results[shared_attrs] = attributes_id
94 if not missing_hashes:
100 self, hashes: Collection[int], session: Session
101 ) -> dict[str, int |
None]:
102 """Load the shared_attrs to attributes_ids mapping into memory from a list of hashes.
104 This call is not thread-safe and must be called from the
107 results: dict[str, int |
None] = {}
108 with session.no_autoflush:
109 for hashs_chunk
in chunked_or_all(hashes, self.recorder.max_bind_vars):
113 results[shared_attrs] = self._id_map[shared_attrs] = cast(
119 def add_pending(self, db_state_attributes: StateAttributes) ->
None:
120 """Add a pending StateAttributes that will be committed at the next interval.
122 This call is not thread-safe and must be called from the
125 assert db_state_attributes.shared_attrs
is not None
126 shared_attrs: str = db_state_attributes.shared_attrs
127 self._pending[shared_attrs] = db_state_attributes
130 """Call after commit to load the attributes_ids of the new StateAttributes into the LRU.
132 This call is not thread-safe and must be called from the
135 for shared_attrs, db_state_attributes
in self._pending.items():
136 self._id_map[shared_attrs] = db_state_attributes.attributes_id
137 self._pending.clear()
140 """Evict purged attributes_ids from the cache when they are no longer used.
142 This call is not thread-safe and must be called from the
145 id_map = self._id_map
146 state_attributes_ids_reversed = {
147 attributes_id: shared_attrs
148 for shared_attrs, attributes_id
in id_map.items()
151 for purged_attributes_id
in attributes_ids.intersection(
152 state_attributes_ids_reversed
154 id_map.pop(state_attributes_ids_reversed[purged_attributes_id],
None)
None __init__(self, Recorder recorder)
None add_pending(self, StateAttributes db_state_attributes)
bytes|None serialize_from_event(self, Event[EventStateChangedData] event)
None post_commit_pending(self)
None load(self, list[Event[EventStateChangedData]] events, Session session)
dict[str, int|None] get_many(self, Iterable[tuple[str, int]] shared_attrs_data_hashes, Session session)
dict[str, int|None] _load_from_hashes(self, Collection[int] hashes, Session session)
int|None get(self, str shared_attr, int data_hash, Session session)
None evict_purged(self, set[int] attributes_ids)
StatementLambdaElement get_shared_attributes(list[int] hashes)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)