Home Assistant Unofficial Reference 2024.12.1
states_meta.py
Go to the documentation of this file.
1 """Support managing StatesMeta."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Iterable, Sequence
6 from typing import TYPE_CHECKING, cast
7 
8 from sqlalchemy.orm.session import Session
9 
10 from homeassistant.core import Event, EventStateChangedData
11 from homeassistant.util.collection import chunked_or_all
12 
13 from ..db_schema import StatesMeta
14 from ..queries import find_all_states_metadata_ids, find_states_metadata_ids
15 from ..util import execute_stmt_lambda_element
16 from . import BaseLRUTableManager
17 
18 if TYPE_CHECKING:
19  from ..core import Recorder
20 
21 CACHE_SIZE = 8192
22 
23 
24 class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
25  """Manage the StatesMeta table."""
26 
27  active = False
28 
29  def __init__(self, recorder: Recorder) -> None:
30  """Initialize the states meta manager."""
31  self._did_first_load_did_first_load = False
32  super().__init__(recorder, CACHE_SIZE)
33 
34  def load(
35  self, events: list[Event[EventStateChangedData]], session: Session
36  ) -> None:
37  """Load the entity_id to metadata_id mapping into memory.
38 
39  This call is not thread-safe and must be called from the
40  recorder thread.
41  """
42  self._did_first_load_did_first_load = True
43  self.get_manyget_many(
44  {
45  new_state.entity_id
46  for event in events
47  if (new_state := event.data["new_state"]) is not None
48  },
49  session,
50  True,
51  )
52 
53  def get(self, entity_id: str, session: Session, from_recorder: bool) -> int | None:
54  """Resolve entity_id to the metadata_id.
55 
56  This call is not thread-safe after startup since
57  purge can remove all references to an entity_id.
58 
59  When calling this method from the recorder thread, set
60  from_recorder to True to ensure any missing entity_ids
61  are added to the cache.
62  """
63  return self.get_manyget_many((entity_id,), session, from_recorder)[entity_id]
64 
65  def get_metadata_id_to_entity_id(self, session: Session) -> dict[int, str]:
66  """Resolve all entity_ids to metadata_ids.
67 
68  This call is always thread-safe.
69  """
70  with session.no_autoflush:
71  return dict(
72  cast(
73  Sequence[tuple[int, str]],
75  session, find_all_states_metadata_ids(), orm_rows=False
76  ),
77  )
78  )
79 
80  def get_many(
81  self, entity_ids: Iterable[str], session: Session, from_recorder: bool
82  ) -> dict[str, int | None]:
83  """Resolve entity_id to metadata_id.
84 
85  This call is not thread-safe after startup since
86  purge can remove all references to an entity_id.
87 
88  When calling this method from the recorder thread, set
89  from_recorder to True to ensure any missing entity_ids
90  are added to the cache.
91  """
92  results: dict[str, int | None] = {}
93  missing: list[str] = []
94  for entity_id in entity_ids:
95  if (metadata_id := self._id_map.get(entity_id)) is None:
96  missing.append(entity_id)
97 
98  results[entity_id] = metadata_id
99 
100  if not missing:
101  return results
102 
103  # Only update the cache if we are in the recorder thread
104  # or the recorder event loop has not started yet since
105  # there is a chance that we could have just deleted all
106  # instances of an entity_id from the database via purge
107  # and we do not want to add it back to the cache from another
108  # thread (history query).
109  update_cache = from_recorder or not self._did_first_load_did_first_load
110 
111  with session.no_autoflush:
112  for missing_chunk in chunked_or_all(missing, self.recorder.max_bind_vars):
113  for metadata_id, entity_id in execute_stmt_lambda_element(
114  session, find_states_metadata_ids(missing_chunk)
115  ):
116  metadata_id = cast(int, metadata_id)
117  results[entity_id] = metadata_id
118 
119  if update_cache:
120  self._id_map[entity_id] = metadata_id
121 
122  return results
123 
124  def add_pending(self, db_states_meta: StatesMeta) -> None:
125  """Add a pending StatesMeta that will be committed at the next interval.
126 
127  This call is not thread-safe and must be called from the
128  recorder thread.
129  """
130  assert db_states_meta.entity_id is not None
131  entity_id: str = db_states_meta.entity_id
132  self._pending[entity_id] = db_states_meta
133 
134  def post_commit_pending(self) -> None:
135  """Call after commit to load the metadata_ids of the new StatesMeta into the LRU.
136 
137  This call is not thread-safe and must be called from the
138  recorder thread.
139  """
140  for entity_id, db_states_meta in self._pending.items():
141  self._id_map[entity_id] = db_states_meta.metadata_id
142  self._pending.clear()
143 
144  def evict_purged(self, entity_ids: Iterable[str]) -> None:
145  """Evict purged event_types from the cache when they are no longer used.
146 
147  This call is not thread-safe and must be called from the
148  recorder thread.
149  """
150  for entity_id in entity_ids:
151  self._id_map.pop(entity_id, None)
152 
154  self,
155  session: Session,
156  entity_id: str,
157  new_entity_id: str,
158  ) -> bool:
159  """Update states metadata for an entity_id."""
160  if self.getget(new_entity_id, session, True) is not None:
161  # If the new entity id already exists we have
162  # a collision and should not update.
163  return False
164  session.query(StatesMeta).filter(StatesMeta.entity_id == entity_id).update(
165  {StatesMeta.entity_id: new_entity_id}
166  )
167  self._id_map.pop(entity_id, None)
168  return True
int|None get(self, str entity_id, Session session, bool from_recorder)
Definition: states_meta.py:53
bool update_metadata(self, Session session, str entity_id, str new_entity_id)
Definition: states_meta.py:158
None load(self, list[Event[EventStateChangedData]] events, Session session)
Definition: states_meta.py:36
dict[str, int|None] get_many(self, Iterable[str] entity_ids, Session session, bool from_recorder)
Definition: states_meta.py:82
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
StatementLambdaElement find_states_metadata_ids(Iterable[str] entity_ids)
Definition: queries.py:70
StatementLambdaElement find_all_states_metadata_ids()
Definition: queries.py:65
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)
Definition: collection.py:25