Home Assistant Unofficial Reference 2024.12.1
statistics_meta.py
Go to the documentation of this file.
1 """Support managing StatesMeta."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import threading
7 from typing import TYPE_CHECKING, Final, Literal
8 
9 from lru import LRU
10 from sqlalchemy import lambda_stmt, select
11 from sqlalchemy.orm.session import Session
12 from sqlalchemy.sql.expression import true
13 from sqlalchemy.sql.lambdas import StatementLambdaElement
14 
15 from ..db_schema import StatisticsMeta
16 from ..models import StatisticMetaData
17 from ..util import execute_stmt_lambda_element
18 
19 if TYPE_CHECKING:
20  from ..core import Recorder
21 
22 CACHE_SIZE = 8192
23 
24 _LOGGER = logging.getLogger(__name__)
25 
26 QUERY_STATISTIC_META = (
27  StatisticsMeta.id,
28  StatisticsMeta.statistic_id,
29  StatisticsMeta.source,
30  StatisticsMeta.unit_of_measurement,
31  StatisticsMeta.has_mean,
32  StatisticsMeta.has_sum,
33  StatisticsMeta.name,
34 )
35 
36 INDEX_ID: Final = 0
37 INDEX_STATISTIC_ID: Final = 1
38 INDEX_SOURCE: Final = 2
39 INDEX_UNIT_OF_MEASUREMENT: Final = 3
40 INDEX_HAS_MEAN: Final = 4
41 INDEX_HAS_SUM: Final = 5
42 INDEX_NAME: Final = 6
43 
44 
46  statistic_ids: set[str] | None = None,
47  statistic_type: Literal["mean", "sum"] | None = None,
48  statistic_source: str | None = None,
49 ) -> StatementLambdaElement:
50  """Generate a statement to fetch metadata."""
51  stmt = lambda_stmt(lambda: select(*QUERY_STATISTIC_META))
52  if statistic_ids:
53  stmt += lambda q: q.where(StatisticsMeta.statistic_id.in_(statistic_ids))
54  if statistic_source is not None:
55  stmt += lambda q: q.where(StatisticsMeta.source == statistic_source)
56  if statistic_type == "mean":
57  stmt += lambda q: q.where(StatisticsMeta.has_mean == true())
58  elif statistic_type == "sum":
59  stmt += lambda q: q.where(StatisticsMeta.has_sum == true())
60  return stmt
61 
62 
64  """Manage the StatisticsMeta table."""
65 
66  def __init__(self, recorder: Recorder) -> None:
67  """Initialize the statistics meta manager."""
68  self.recorderrecorder = recorder
69  self._stat_id_to_id_meta: LRU[str, tuple[int, StatisticMetaData]] = LRU(
70  CACHE_SIZE
71  )
72 
73  def _clear_cache(self, statistic_ids: list[str]) -> None:
74  """Clear the cache."""
75  for statistic_id in statistic_ids:
76  self._stat_id_to_id_meta.pop(statistic_id, None)
77 
79  self,
80  session: Session,
81  statistic_ids: set[str] | None = None,
82  statistic_type: Literal["mean", "sum"] | None = None,
83  statistic_source: str | None = None,
84  ) -> dict[str, tuple[int, StatisticMetaData]]:
85  """Fetch meta data and process it into results and/or cache."""
86  # Only update the cache if we are in the recorder thread and there are no
87  # new objects that are not yet committed to the database in the session.
88  update_cache = (
89  not session.new
90  and not session.dirty
91  and self.recorderrecorder.thread_id == threading.get_ident()
92  )
93  results: dict[str, tuple[int, StatisticMetaData]] = {}
94  id_meta: tuple[int, StatisticMetaData]
95  meta: StatisticMetaData
96  statistic_id: str
97  row_id: int
98  with session.no_autoflush:
99  stat_id_to_id_meta = self._stat_id_to_id_meta
100  for row in execute_stmt_lambda_element(
101  session,
103  statistic_ids, statistic_type, statistic_source
104  ),
105  orm_rows=False,
106  ):
107  statistic_id = row[INDEX_STATISTIC_ID]
108  row_id = row[INDEX_ID]
109  meta = {
110  "has_mean": row[INDEX_HAS_MEAN],
111  "has_sum": row[INDEX_HAS_SUM],
112  "name": row[INDEX_NAME],
113  "source": row[INDEX_SOURCE],
114  "statistic_id": statistic_id,
115  "unit_of_measurement": row[INDEX_UNIT_OF_MEASUREMENT],
116  }
117  id_meta = (row_id, meta)
118  results[statistic_id] = id_meta
119  if update_cache:
120  stat_id_to_id_meta[statistic_id] = id_meta
121  return results
122 
123  def _assert_in_recorder_thread(self) -> None:
124  """Assert that we are in the recorder thread."""
125  if self.recorderrecorder.thread_id != threading.get_ident():
126  raise RuntimeError("Detected unsafe call not in recorder thread")
127 
129  self, session: Session, statistic_id: str, new_metadata: StatisticMetaData
130  ) -> int:
131  """Add metadata to the database.
132 
133  This call is not thread-safe and must be called from the
134  recorder thread.
135  """
136  self._assert_in_recorder_thread_assert_in_recorder_thread()
137  meta = StatisticsMeta.from_meta(new_metadata)
138  session.add(meta)
139  # Flush to assign an ID
140  session.flush()
141  _LOGGER.debug(
142  "Added new statistics metadata for %s, new_metadata: %s",
143  statistic_id,
144  new_metadata,
145  )
146  return meta.id
147 
149  self,
150  session: Session,
151  statistic_id: str,
152  new_metadata: StatisticMetaData,
153  old_metadata_dict: dict[str, tuple[int, StatisticMetaData]],
154  ) -> tuple[str | None, int]:
155  """Update metadata in the database.
156 
157  This call is not thread-safe and must be called from the
158  recorder thread.
159  """
160  metadata_id, old_metadata = old_metadata_dict[statistic_id]
161  if not (
162  old_metadata["has_mean"] != new_metadata["has_mean"]
163  or old_metadata["has_sum"] != new_metadata["has_sum"]
164  or old_metadata["name"] != new_metadata["name"]
165  or old_metadata["unit_of_measurement"]
166  != new_metadata["unit_of_measurement"]
167  ):
168  return None, metadata_id
169 
170  self._assert_in_recorder_thread_assert_in_recorder_thread()
171  session.query(StatisticsMeta).filter_by(statistic_id=statistic_id).update(
172  {
173  StatisticsMeta.has_mean: new_metadata["has_mean"],
174  StatisticsMeta.has_sum: new_metadata["has_sum"],
175  StatisticsMeta.name: new_metadata["name"],
176  StatisticsMeta.unit_of_measurement: new_metadata["unit_of_measurement"],
177  },
178  synchronize_session=False,
179  )
180  self._clear_cache_clear_cache([statistic_id])
181  _LOGGER.debug(
182  "Updated statistics metadata for %s, old_metadata: %s, new_metadata: %s",
183  statistic_id,
184  old_metadata,
185  new_metadata,
186  )
187  return statistic_id, metadata_id
188 
189  def load(self, session: Session) -> None:
190  """Load the statistic_id to metadata_id mapping into memory.
191 
192  This call is not thread-safe and must be called from the
193  recorder thread.
194  """
195  self.get_manyget_many(session)
196 
197  def get(
198  self, session: Session, statistic_id: str
199  ) -> tuple[int, StatisticMetaData] | None:
200  """Resolve statistic_id to the metadata_id."""
201  return self.get_manyget_many(session, {statistic_id}).get(statistic_id)
202 
203  def get_many(
204  self,
205  session: Session,
206  statistic_ids: set[str] | None = None,
207  statistic_type: Literal["mean", "sum"] | None = None,
208  statistic_source: str | None = None,
209  ) -> dict[str, tuple[int, StatisticMetaData]]:
210  """Fetch meta data.
211 
212  Returns a dict of (metadata_id, StatisticMetaData) tuples indexed by statistic_id.
213 
214  If statistic_ids is given, fetch metadata only for the listed statistics_ids.
215  If statistic_type is given, fetch metadata only for statistic_ids supporting it.
216  """
217  if statistic_ids is None:
218  # Fetch metadata from the database
219  return self._get_from_database_get_from_database(
220  session,
221  statistic_type=statistic_type,
222  statistic_source=statistic_source,
223  )
224 
225  if statistic_type is not None or statistic_source is not None:
226  # This was originally implemented but we never used it
227  # so the code was ripped out to reduce the maintenance
228  # burden.
229  raise ValueError(
230  "Providing statistic_type and statistic_source is mutually exclusive of statistic_ids"
231  )
232 
233  results = self.get_from_cache_threadsafeget_from_cache_threadsafe(statistic_ids)
234  if not (missing_statistic_id := statistic_ids.difference(results)):
235  return results
236 
237  # Fetch metadata from the database
238  return results | self._get_from_database_get_from_database(
239  session, statistic_ids=missing_statistic_id
240  )
241 
243  self, statistic_ids: set[str]
244  ) -> dict[str, tuple[int, StatisticMetaData]]:
245  """Get metadata from cache.
246 
247  This call is thread safe and can be run in the event loop,
248  the database executor, or the recorder thread.
249  """
250  return {
251  statistic_id: id_meta
252  for statistic_id in statistic_ids
253  # We must use a get call here and never iterate over the dict
254  # because the dict can be modified by the recorder thread
255  # while we are iterating over it.
256  if (id_meta := self._stat_id_to_id_meta.get(statistic_id))
257  }
258 
260  self,
261  session: Session,
262  new_metadata: StatisticMetaData,
263  old_metadata_dict: dict[str, tuple[int, StatisticMetaData]],
264  ) -> tuple[str | None, int]:
265  """Get metadata_id for a statistic_id.
266 
267  If the statistic_id is previously unknown, add it. If it's already known, update
268  metadata if needed.
269 
270  Updating metadata source is not possible.
271 
272  Returns a tuple of (statistic_id | None, metadata_id).
273 
274  statistic_id is None if the metadata was not updated
275 
276  This call is not thread-safe and must be called from the
277  recorder thread.
278  """
279  statistic_id = new_metadata["statistic_id"]
280  if statistic_id not in old_metadata_dict:
281  return statistic_id, self._add_metadata_add_metadata(session, statistic_id, new_metadata)
282  return self._update_metadata_update_metadata(
283  session, statistic_id, new_metadata, old_metadata_dict
284  )
285 
287  self, session: Session, statistic_id: str, new_unit: str | None
288  ) -> None:
289  """Update the unit of measurement for a statistic_id.
290 
291  This call is not thread-safe and must be called from the
292  recorder thread.
293  """
294  self._assert_in_recorder_thread_assert_in_recorder_thread()
295  session.query(StatisticsMeta).filter(
296  StatisticsMeta.statistic_id == statistic_id
297  ).update({StatisticsMeta.unit_of_measurement: new_unit})
298  self._clear_cache_clear_cache([statistic_id])
299 
301  self,
302  session: Session,
303  source: str,
304  old_statistic_id: str,
305  new_statistic_id: str,
306  ) -> None:
307  """Update the statistic_id for a statistic_id.
308 
309  This call is not thread-safe and must be called from the
310  recorder thread.
311  """
312  self._assert_in_recorder_thread_assert_in_recorder_thread()
313  if self.getget(session, new_statistic_id):
314  _LOGGER.error(
315  "Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",
316  old_statistic_id,
317  new_statistic_id,
318  )
319  return
320  session.query(StatisticsMeta).filter(
321  (StatisticsMeta.statistic_id == old_statistic_id)
322  & (StatisticsMeta.source == source)
323  ).update({StatisticsMeta.statistic_id: new_statistic_id})
324  self._clear_cache_clear_cache([old_statistic_id])
325 
326  def delete(self, session: Session, statistic_ids: list[str]) -> None:
327  """Clear statistics for a list of statistic_ids.
328 
329  This call is not thread-safe and must be called from the
330  recorder thread.
331  """
332  self._assert_in_recorder_thread_assert_in_recorder_thread()
333  session.query(StatisticsMeta).filter(
334  StatisticsMeta.statistic_id.in_(statistic_ids)
335  ).delete(synchronize_session=False)
336  self._clear_cache_clear_cache(statistic_ids)
337 
338  def reset(self) -> None:
339  """Reset the cache."""
340  self._stat_id_to_id_meta.clear()
341 
342  def adjust_lru_size(self, new_size: int) -> None:
343  """Adjust the LRU cache size.
344 
345  This call is not thread-safe and must be called from the
346  recorder thread.
347  """
348  lru: LRU = self._stat_id_to_id_meta
349  if new_size > lru.get_size():
350  lru.set_size(new_size)
dict[str, tuple[int, StatisticMetaData]] get_from_cache_threadsafe(self, set[str] statistic_ids)
int _add_metadata(self, Session session, str statistic_id, StatisticMetaData new_metadata)
None update_statistic_id(self, Session session, str source, str old_statistic_id, str new_statistic_id)
tuple[str|None, int] update_or_add(self, Session session, StatisticMetaData new_metadata, dict[str, tuple[int, StatisticMetaData]] old_metadata_dict)
tuple[int, StatisticMetaData]|None get(self, Session session, str statistic_id)
None update_unit_of_measurement(self, Session session, str statistic_id, str|None new_unit)
dict[str, tuple[int, StatisticMetaData]] get_many(self, Session session, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
tuple[str|None, int] _update_metadata(self, Session session, str statistic_id, StatisticMetaData new_metadata, dict[str, tuple[int, StatisticMetaData]] old_metadata_dict)
dict[str, tuple[int, StatisticMetaData]] _get_from_database(self, Session session, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
StatementLambdaElement _generate_get_metadata_stmt(set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None, str|None statistic_source=None)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179