Home Assistant Unofficial Reference 2024.12.1
event_types.py
Go to the documentation of this file.
1 """Support managing EventTypes."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Iterable
6 from typing import TYPE_CHECKING, Any, cast
7 
8 from lru import LRU
9 from sqlalchemy.orm.session import Session
10 
11 from homeassistant.core import Event
12 from homeassistant.util.collection import chunked_or_all
13 from homeassistant.util.event_type import EventType
14 
15 from ..db_schema import EventTypes
16 from ..queries import find_event_type_ids
17 from ..tasks import RefreshEventTypesTask
18 from ..util import execute_stmt_lambda_element
19 from . import BaseLRUTableManager
20 
21 if TYPE_CHECKING:
22  from ..core import Recorder
23 
24 
25 CACHE_SIZE = 2048
26 
27 
28 class EventTypeManager(BaseLRUTableManager[EventTypes]):
29  """Manage the EventTypes table."""
30 
31  active = False
32 
33  def __init__(self, recorder: Recorder) -> None:
34  """Initialize the event type manager."""
35  super().__init__(recorder, CACHE_SIZE)
36  self._non_existent_event_types: LRU[EventType[Any] | str, None] = LRU(
37  CACHE_SIZE
38  )
39 
40  def load(self, events: list[Event], session: Session) -> None:
41  """Load the event_type to event_type_ids mapping into memory.
42 
43  This call is not thread-safe and must be called from the
44  recorder thread.
45  """
46  self.get_manyget_many(
47  {event.event_type for event in events if event.event_type is not None},
48  session,
49  True,
50  )
51 
52  def get(
53  self,
54  event_type: EventType[Any] | str,
55  session: Session,
56  from_recorder: bool = False,
57  ) -> int | None:
58  """Resolve event_type to the event_type_id.
59 
60  This call is not thread-safe and must be called from the
61  recorder thread.
62  """
63  return self.get_manyget_many((event_type,), session)[event_type]
64 
65  def get_many(
66  self,
67  event_types: Iterable[EventType[Any] | str],
68  session: Session,
69  from_recorder: bool = False,
70  ) -> dict[EventType[Any] | str, int | None]:
71  """Resolve event_types to event_type_ids.
72 
73  This call is not thread-safe and must be called from the
74  recorder thread.
75  """
76  results: dict[EventType[Any] | str, int | None] = {}
77  missing: list[EventType[Any] | str] = []
78  non_existent: list[EventType[Any] | str] = []
79 
80  for event_type in event_types:
81  if (event_type_id := self._id_map.get(event_type)) is None:
82  if event_type in self._non_existent_event_types:
83  results[event_type] = None
84  else:
85  missing.append(event_type)
86 
87  results[event_type] = event_type_id
88 
89  if not missing:
90  return results
91 
92  with session.no_autoflush:
93  for missing_chunk in chunked_or_all(missing, self.recorder.max_bind_vars):
94  for event_type_id, event_type in execute_stmt_lambda_element(
95  session, find_event_type_ids(missing_chunk), orm_rows=False
96  ):
97  results[event_type] = self._id_map[event_type] = cast(
98  int, event_type_id
99  )
100 
101  if non_existent := [
102  event_type for event_type in missing if results[event_type] is None
103  ]:
104  if from_recorder:
105  # We are already in the recorder thread so we can update the
106  # non-existent event types directly.
107  for event_type in non_existent:
108  self._non_existent_event_types[event_type] = None
109  else:
110  # Queue a task to refresh the event types since its not
111  # thread-safe to do it here since we are not in the recorder
112  # thread.
113  self.recorder.queue_task(RefreshEventTypesTask(non_existent))
114 
115  return results
116 
117  def add_pending(self, db_event_type: EventTypes) -> None:
118  """Add a pending EventTypes that will be committed at the next interval.
119 
120  This call is not thread-safe and must be called from the
121  recorder thread.
122  """
123  assert db_event_type.event_type is not None
124  event_type: str = db_event_type.event_type
125  self._pending[event_type] = db_event_type
126 
127  def post_commit_pending(self) -> None:
128  """Call after commit to load the event_type_ids of the new EventTypes into the LRU.
129 
130  This call is not thread-safe and must be called from the
131  recorder thread.
132  """
133  for event_type, db_event_types in self._pending.items():
134  self._id_map[event_type] = db_event_types.event_type_id
135  self.clear_non_existentclear_non_existent(event_type)
136  self._pending.clear()
137 
138  def clear_non_existent(self, event_type: EventType[Any] | str) -> None:
139  """Clear a non-existent event type from the cache.
140 
141  This call is not thread-safe and must be called from the
142  recorder thread.
143  """
144  self._non_existent_event_types.pop(event_type, None)
145 
146  def evict_purged(self, event_types: Iterable[str]) -> None:
147  """Evict purged event_types from the cache when they are no longer used.
148 
149  This call is not thread-safe and must be called from the
150  recorder thread.
151  """
152  for event_type in event_types:
153  self._id_map.pop(event_type, None)
dict[EventType[Any]|str, int|None] get_many(self, Iterable[EventType[Any]|str] event_types, Session session, bool from_recorder=False)
Definition: event_types.py:70
int|None get(self, EventType[Any]|str event_type, Session session, bool from_recorder=False)
Definition: event_types.py:57
StatementLambdaElement find_event_type_ids(Iterable[str] event_types)
Definition: queries.py:56
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)
Definition: collection.py:25