1 """Support managing StatesMeta."""
3 from __future__
import annotations
5 from collections.abc
import Iterable, Sequence
6 from typing
import TYPE_CHECKING, cast
8 from sqlalchemy.orm.session
import Session
13 from ..db_schema
import StatesMeta
14 from ..queries
import find_all_states_metadata_ids, find_states_metadata_ids
15 from ..util
import execute_stmt_lambda_element
16 from .
import BaseLRUTableManager
19 from ..core
import Recorder
25 """Manage the StatesMeta table."""
29 def __init__(self, recorder: Recorder) ->
None:
30 """Initialize the states meta manager."""
32 super().
__init__(recorder, CACHE_SIZE)
35 self, events: list[Event[EventStateChangedData]], session: Session
37 """Load the entity_id to metadata_id mapping into memory.
39 This call is not thread-safe and must be called from the
47 if (new_state := event.data[
"new_state"])
is not None
53 def get(self, entity_id: str, session: Session, from_recorder: bool) -> int |
None:
54 """Resolve entity_id to the metadata_id.
56 This call is not thread-safe after startup since
57 purge can remove all references to an entity_id.
59 When calling this method from the recorder thread, set
60 from_recorder to True to ensure any missing entity_ids
61 are added to the cache.
63 return self.
get_manyget_many((entity_id,), session, from_recorder)[entity_id]
66 """Resolve all entity_ids to metadata_ids.
68 This call is always thread-safe.
70 with session.no_autoflush:
73 Sequence[tuple[int, str]],
81 self, entity_ids: Iterable[str], session: Session, from_recorder: bool
82 ) -> dict[str, int |
None]:
83 """Resolve entity_id to metadata_id.
85 This call is not thread-safe after startup since
86 purge can remove all references to an entity_id.
88 When calling this method from the recorder thread, set
89 from_recorder to True to ensure any missing entity_ids
90 are added to the cache.
92 results: dict[str, int |
None] = {}
93 missing: list[str] = []
94 for entity_id
in entity_ids:
95 if (metadata_id := self._id_map.
get(entity_id))
is None:
96 missing.append(entity_id)
98 results[entity_id] = metadata_id
109 update_cache = from_recorder
or not self.
_did_first_load_did_first_load
111 with session.no_autoflush:
112 for missing_chunk
in chunked_or_all(missing, self.recorder.max_bind_vars):
116 metadata_id = cast(int, metadata_id)
117 results[entity_id] = metadata_id
120 self._id_map[entity_id] = metadata_id
125 """Add a pending StatesMeta that will be committed at the next interval.
127 This call is not thread-safe and must be called from the
130 assert db_states_meta.entity_id
is not None
131 entity_id: str = db_states_meta.entity_id
132 self._pending[entity_id] = db_states_meta
135 """Call after commit to load the metadata_ids of the new StatesMeta into the LRU.
137 This call is not thread-safe and must be called from the
140 for entity_id, db_states_meta
in self._pending.items():
141 self._id_map[entity_id] = db_states_meta.metadata_id
142 self._pending.clear()
145 """Evict purged event_types from the cache when they are no longer used.
147 This call is not thread-safe and must be called from the
150 for entity_id
in entity_ids:
151 self._id_map.pop(entity_id,
None)
159 """Update states metadata for an entity_id."""
160 if self.
getget(new_entity_id, session,
True)
is not None:
164 session.query(StatesMeta).filter(StatesMeta.entity_id == entity_id).
update(
165 {StatesMeta.entity_id: new_entity_id}
167 self._id_map.pop(entity_id,
None)
IssData update(pyiss.ISS iss)
StatementLambdaElement find_states_metadata_ids(Iterable[str] entity_ids)
StatementLambdaElement find_all_states_metadata_ids()
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)