Home Assistant Unofficial Reference 2024.12.1
event_data.py
Go to the documentation of this file.
1 """Support managing EventData."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Collection, Iterable
6 import logging
7 from typing import TYPE_CHECKING, cast
8 
9 from sqlalchemy.orm.session import Session
10 
11 from homeassistant.core import Event
12 from homeassistant.util.collection import chunked_or_all
13 from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS
14 
15 from ..db_schema import EventData
16 from ..queries import get_shared_event_datas
17 from ..util import execute_stmt_lambda_element
18 from . import BaseLRUTableManager
19 
20 if TYPE_CHECKING:
21  from ..core import Recorder
22 
23 
24 CACHE_SIZE = 2048
25 
26 _LOGGER = logging.getLogger(__name__)
27 
28 
29 class EventDataManager(BaseLRUTableManager[EventData]):
30  """Manage the EventData table."""
31 
32  def __init__(self, recorder: Recorder) -> None:
33  """Initialize the event type manager."""
34  super().__init__(recorder, CACHE_SIZE)
35 
36  def serialize_from_event(self, event: Event) -> bytes | None:
37  """Serialize event data."""
38  try:
39  return EventData.shared_data_bytes_from_event(
40  event, self.recorder.dialect_name
41  )
42  except JSON_ENCODE_EXCEPTIONS as ex:
43  _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
44  return None
45 
46  def load(self, events: list[Event], session: Session) -> None:
47  """Load the shared_datas to data_ids mapping into memory from events.
48 
49  This call is not thread-safe and must be called from the
50  recorder thread.
51  """
52  if hashes := {
53  EventData.hash_shared_data_bytes(shared_event_bytes)
54  for event in events
55  if (shared_event_bytes := self.serialize_from_eventserialize_from_event(event))
56  }:
57  self._load_from_hashes_load_from_hashes(hashes, session)
58 
59  def get(self, shared_data: str, data_hash: int, session: Session) -> int | None:
60  """Resolve shared_datas to the data_id.
61 
62  This call is not thread-safe and must be called from the
63  recorder thread.
64  """
65  return self.get_manyget_many(((shared_data, data_hash),), session)[shared_data]
66 
67  def get_many(
68  self, shared_data_data_hashs: Iterable[tuple[str, int]], session: Session
69  ) -> dict[str, int | None]:
70  """Resolve shared_datas to data_ids.
71 
72  This call is not thread-safe and must be called from the
73  recorder thread.
74  """
75  results: dict[str, int | None] = {}
76  missing_hashes: set[int] = set()
77  for shared_data, data_hash in shared_data_data_hashs:
78  if (data_id := self._id_map.get(shared_data)) is None:
79  missing_hashes.add(data_hash)
80 
81  results[shared_data] = data_id
82 
83  if not missing_hashes:
84  return results
85 
86  return results | self._load_from_hashes_load_from_hashes(missing_hashes, session)
87 
89  self, hashes: Collection[int], session: Session
90  ) -> dict[str, int | None]:
91  """Load the shared_datas to data_ids mapping into memory from a list of hashes.
92 
93  This call is not thread-safe and must be called from the
94  recorder thread.
95  """
96  results: dict[str, int | None] = {}
97  with session.no_autoflush:
98  for hashs_chunk in chunked_or_all(hashes, self.recorder.max_bind_vars):
99  for data_id, shared_data in execute_stmt_lambda_element(
100  session, get_shared_event_datas(hashs_chunk), orm_rows=False
101  ):
102  results[shared_data] = self._id_map[shared_data] = cast(
103  int, data_id
104  )
105 
106  return results
107 
108  def add_pending(self, db_event_data: EventData) -> None:
109  """Add a pending EventData that will be committed at the next interval.
110 
111  This call is not thread-safe and must be called from the
112  recorder thread.
113  """
114  assert db_event_data.shared_data is not None
115  shared_data: str = db_event_data.shared_data
116  self._pending[shared_data] = db_event_data
117 
118  def post_commit_pending(self) -> None:
119  """Call after commit to load the data_ids of the new EventData into the LRU.
120 
121  This call is not thread-safe and must be called from the
122  recorder thread.
123  """
124  for shared_data, db_event_data in self._pending.items():
125  self._id_map[shared_data] = db_event_data.data_id
126  self._pending.clear()
127 
128  def evict_purged(self, data_ids: set[int]) -> None:
129  """Evict purged data_ids from the cache when they are no longer used.
130 
131  This call is not thread-safe and must be called from the
132  recorder thread.
133  """
134  id_map = self._id_map
135  event_data_ids_reversed = {
136  data_id: shared_data for shared_data, data_id in id_map.items()
137  }
138  # Evict any purged data from the cache
139  for purged_data_id in data_ids.intersection(event_data_ids_reversed):
140  id_map.pop(event_data_ids_reversed[purged_data_id], None)
dict[str, int|None] _load_from_hashes(self, Collection[int] hashes, Session session)
Definition: event_data.py:90
dict[str, int|None] get_many(self, Iterable[tuple[str, int]] shared_data_data_hashs, Session session)
Definition: event_data.py:69
int|None get(self, str shared_data, int data_hash, Session session)
Definition: event_data.py:59
StatementLambdaElement get_shared_event_datas(list[int] hashes)
Definition: queries.py:47
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)
Definition: collection.py:25