1 """Support managing EventData."""
3 from __future__
import annotations
5 from collections.abc
import Collection, Iterable
7 from typing
import TYPE_CHECKING, cast
9 from sqlalchemy.orm.session
import Session
15 from ..db_schema
import EventData
16 from ..queries
import get_shared_event_datas
17 from ..util
import execute_stmt_lambda_element
18 from .
import BaseLRUTableManager
21 from ..core
import Recorder
26 _LOGGER = logging.getLogger(__name__)
30 """Manage the EventData table."""
32 def __init__(self, recorder: Recorder) ->
None:
33 """Initialize the event type manager."""
34 super().
__init__(recorder, CACHE_SIZE)
37 """Serialize event data."""
39 return EventData.shared_data_bytes_from_event(
40 event, self.recorder.dialect_name
42 except JSON_ENCODE_EXCEPTIONS
as ex:
43 _LOGGER.warning(
"Event is not JSON serializable: %s: %s", event, ex)
46 def load(self, events: list[Event], session: Session) ->
None:
47 """Load the shared_datas to data_ids mapping into memory from events.
49 This call is not thread-safe and must be called from the
53 EventData.hash_shared_data_bytes(shared_event_bytes)
59 def get(self, shared_data: str, data_hash: int, session: Session) -> int |
None:
60 """Resolve shared_datas to the data_id.
62 This call is not thread-safe and must be called from the
65 return self.
get_manyget_many(((shared_data, data_hash),), session)[shared_data]
68 self, shared_data_data_hashs: Iterable[tuple[str, int]], session: Session
69 ) -> dict[str, int |
None]:
70 """Resolve shared_datas to data_ids.
72 This call is not thread-safe and must be called from the
75 results: dict[str, int |
None] = {}
76 missing_hashes: set[int] = set()
77 for shared_data, data_hash
in shared_data_data_hashs:
78 if (data_id := self._id_map.
get(shared_data))
is None:
79 missing_hashes.add(data_hash)
81 results[shared_data] = data_id
83 if not missing_hashes:
89 self, hashes: Collection[int], session: Session
90 ) -> dict[str, int |
None]:
91 """Load the shared_datas to data_ids mapping into memory from a list of hashes.
93 This call is not thread-safe and must be called from the
96 results: dict[str, int |
None] = {}
97 with session.no_autoflush:
98 for hashs_chunk
in chunked_or_all(hashes, self.recorder.max_bind_vars):
102 results[shared_data] = self._id_map[shared_data] = cast(
109 """Add a pending EventData that will be committed at the next interval.
111 This call is not thread-safe and must be called from the
114 assert db_event_data.shared_data
is not None
115 shared_data: str = db_event_data.shared_data
116 self._pending[shared_data] = db_event_data
119 """Call after commit to load the data_ids of the new EventData into the LRU.
121 This call is not thread-safe and must be called from the
124 for shared_data, db_event_data
in self._pending.items():
125 self._id_map[shared_data] = db_event_data.data_id
126 self._pending.clear()
129 """Evict purged data_ids from the cache when they are no longer used.
131 This call is not thread-safe and must be called from the
134 id_map = self._id_map
135 event_data_ids_reversed = {
136 data_id: shared_data
for shared_data, data_id
in id_map.items()
139 for purged_data_id
in data_ids.intersection(event_data_ids_reversed):
140 id_map.pop(event_data_ids_reversed[purged_data_id],
None)
None add_pending(self, EventData db_event_data)
dict[str, int|None] _load_from_hashes(self, Collection[int] hashes, Session session)
dict[str, int|None] get_many(self, Iterable[tuple[str, int]] shared_data_data_hashs, Session session)
None post_commit_pending(self)
None evict_purged(self, set[int] data_ids)
None load(self, list[Event] events, Session session)
bytes|None serialize_from_event(self, Event event)
int|None get(self, str shared_data, int data_hash, Session session)
None __init__(self, Recorder recorder)
StatementLambdaElement get_shared_event_datas(list[int] hashes)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)