Home Assistant Unofficial Reference 2024.12.1
processor.py
Go to the documentation of this file.
1 """Event parser and human readable log generator."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Generator, Sequence
6 from dataclasses import dataclass
7 from datetime import datetime as dt
8 import logging
9 import time
10 from typing import TYPE_CHECKING, Any
11 
12 from sqlalchemy.engine import Result
13 from sqlalchemy.engine.row import Row
14 
15 from homeassistant.components.recorder import get_instance
18  bytes_to_uuid_hex_or_none,
19  extract_event_type_ids,
20  extract_metadata_ids,
21  process_timestamp_to_utc_isoformat,
22 )
24  execute_stmt_lambda_element,
25  session_scope,
26 )
27 from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
28 from homeassistant.const import (
29  ATTR_DOMAIN,
30  ATTR_ENTITY_ID,
31  ATTR_FRIENDLY_NAME,
32  ATTR_NAME,
33  ATTR_SERVICE,
34  EVENT_CALL_SERVICE,
35  EVENT_LOGBOOK_ENTRY,
36 )
37 from homeassistant.core import HomeAssistant, split_entity_id
38 from homeassistant.helpers import entity_registry as er
39 import homeassistant.util.dt as dt_util
40 from homeassistant.util.event_type import EventType
41 
42 from .const import (
43  ATTR_MESSAGE,
44  CONTEXT_DOMAIN,
45  CONTEXT_ENTITY_ID,
46  CONTEXT_ENTITY_ID_NAME,
47  CONTEXT_EVENT_TYPE,
48  CONTEXT_MESSAGE,
49  CONTEXT_NAME,
50  CONTEXT_SERVICE,
51  CONTEXT_SOURCE,
52  CONTEXT_STATE,
53  CONTEXT_USER_ID,
54  DOMAIN,
55  LOGBOOK_ENTRY_DOMAIN,
56  LOGBOOK_ENTRY_ENTITY_ID,
57  LOGBOOK_ENTRY_ICON,
58  LOGBOOK_ENTRY_MESSAGE,
59  LOGBOOK_ENTRY_NAME,
60  LOGBOOK_ENTRY_SOURCE,
61  LOGBOOK_ENTRY_STATE,
62  LOGBOOK_ENTRY_WHEN,
63 )
64 from .helpers import is_sensor_continuous
65 from .models import (
66  CONTEXT_ID_BIN_POS,
67  CONTEXT_ONLY_POS,
68  CONTEXT_PARENT_ID_BIN_POS,
69  CONTEXT_POS,
70  CONTEXT_USER_ID_BIN_POS,
71  ENTITY_ID_POS,
72  EVENT_TYPE_POS,
73  ICON_POS,
74  ROW_ID_POS,
75  STATE_POS,
76  TIME_FIRED_TS_POS,
77  EventAsRow,
78  LazyEventPartialState,
79  LogbookConfig,
80  async_event_to_row,
81 )
82 from .queries import statement_for_request
83 from .queries.common import PSEUDO_EVENT_STATE_CHANGED
84 
85 _LOGGER = logging.getLogger(__name__)
86 
87 
88 @dataclass(slots=True)
89 class LogbookRun:
90  """A logbook run which may be a long running event stream or single request."""
91 
92  context_lookup: dict[bytes | None, Row | EventAsRow | None]
93  external_events: dict[
94  EventType[Any] | str,
95  tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]],
96  ]
97  event_cache: EventCache
98  entity_name_cache: EntityNameCache
99  include_entity_name: bool
100  timestamp: bool
101  memoize_new_contexts: bool = True
102 
103 
105  """Stream into logbook format."""
106 
107  def __init__(
108  self,
109  hass: HomeAssistant,
110  event_types: tuple[EventType[Any] | str, ...],
111  entity_ids: list[str] | None = None,
112  device_ids: list[str] | None = None,
113  context_id: str | None = None,
114  timestamp: bool = False,
115  include_entity_name: bool = True,
116  ) -> None:
117  """Init the event stream."""
118  assert not (
119  context_id and (entity_ids or device_ids)
120  ), "can't pass in both context_id and (entity_ids or device_ids)"
121  self.hasshass = hass
122  self.ent_regent_reg = er.async_get(hass)
123  self.event_typesevent_types = event_types
124  self.entity_idsentity_ids = entity_ids
125  self.device_idsdevice_ids = device_ids
126  self.context_idcontext_id = context_id
127  logbook_config: LogbookConfig = hass.data[DOMAIN]
128  self.filters: Filters | None = logbook_config.sqlalchemy_filter
129  self.logbook_runlogbook_run = LogbookRun(
130  context_lookup={None: None},
131  external_events=logbook_config.external_events,
132  event_cache=EventCache({}),
133  entity_name_cache=EntityNameCache(self.hasshass),
134  include_entity_name=include_entity_name,
135  timestamp=timestamp,
136  )
137  self.context_augmentercontext_augmenter = ContextAugmenter(self.logbook_runlogbook_run)
138 
139  @property
140  def limited_select(self) -> bool:
141  """Check if the stream is limited by entities context or device ids."""
142  return bool(self.entity_idsentity_ids or self.context_idcontext_id or self.device_idsdevice_ids)
143 
144  def switch_to_live(self) -> None:
145  """Switch to live stream.
146 
147  Clear caches so we can reduce memory pressure.
148  """
149  self.logbook_runlogbook_run.event_cache.clear()
150  self.logbook_runlogbook_run.context_lookup.clear()
151  self.logbook_runlogbook_run.memoize_new_contexts = False
152 
154  self,
155  start_day: dt,
156  end_day: dt,
157  ) -> list[dict[str, Any]]:
158  """Get events for a period of time."""
159  with session_scope(hass=self.hasshass, read_only=True) as session:
160  metadata_ids: list[int] | None = None
161  instance = get_instance(self.hasshass)
162  if self.entity_idsentity_ids:
163  metadata_ids = extract_metadata_ids(
164  instance.states_meta_manager.get_many(
165  self.entity_idsentity_ids, session, False
166  )
167  )
168  event_type_ids = tuple(
170  instance.event_type_manager.get_many(self.event_typesevent_types, session)
171  )
172  )
173  stmt = statement_for_request(
174  start_day,
175  end_day,
176  event_type_ids,
177  self.entity_idsentity_ids,
178  metadata_ids,
179  self.device_idsdevice_ids,
180  self.filters,
181  self.context_idcontext_id,
182  )
183  return self.humanifyhumanify(
184  execute_stmt_lambda_element(session, stmt, orm_rows=False)
185  )
186 
187  def humanify(
188  self, rows: Generator[EventAsRow] | Sequence[Row] | Result
189  ) -> list[dict[str, str]]:
190  """Humanify rows."""
191  return list(
192  _humanify(
193  self.hasshass,
194  rows,
195  self.ent_regent_reg,
196  self.logbook_runlogbook_run,
197  self.context_augmentercontext_augmenter,
198  )
199  )
200 
201 
203  hass: HomeAssistant,
204  rows: Generator[EventAsRow] | Sequence[Row] | Result,
205  ent_reg: er.EntityRegistry,
206  logbook_run: LogbookRun,
207  context_augmenter: ContextAugmenter,
208 ) -> Generator[dict[str, Any]]:
209  """Generate a converted list of events into entries."""
210  # Continuous sensors, will be excluded from the logbook
211  continuous_sensors: dict[str, bool] = {}
212  context_lookup = logbook_run.context_lookup
213  external_events = logbook_run.external_events
214  event_cache_get = logbook_run.event_cache.get
215  entity_name_cache_get = logbook_run.entity_name_cache.get
216  include_entity_name = logbook_run.include_entity_name
217  timestamp = logbook_run.timestamp
218  memoize_new_contexts = logbook_run.memoize_new_contexts
219  get_context = context_augmenter.get_context
220  context_id_bin: bytes
221  data: dict[str, Any]
222 
223  # Process rows
224  for row in rows:
225  context_id_bin = row[CONTEXT_ID_BIN_POS]
226  if memoize_new_contexts and context_id_bin not in context_lookup:
227  context_lookup[context_id_bin] = row
228  if row[CONTEXT_ONLY_POS]:
229  continue
230  event_type = row[EVENT_TYPE_POS]
231  if event_type == EVENT_CALL_SERVICE:
232  continue
233 
234  if event_type is PSEUDO_EVENT_STATE_CHANGED:
235  entity_id = row[ENTITY_ID_POS]
236  if TYPE_CHECKING:
237  assert entity_id is not None
238  # Skip continuous sensors
239  if (
240  is_continuous := continuous_sensors.get(entity_id)
241  ) is None and split_entity_id(entity_id)[0] == SENSOR_DOMAIN:
242  is_continuous = is_sensor_continuous(hass, ent_reg, entity_id)
243  continuous_sensors[entity_id] = is_continuous
244  if is_continuous:
245  continue
246 
247  data = {
248  LOGBOOK_ENTRY_STATE: row[STATE_POS],
249  LOGBOOK_ENTRY_ENTITY_ID: entity_id,
250  }
251  if include_entity_name:
252  data[LOGBOOK_ENTRY_NAME] = entity_name_cache_get(entity_id)
253  if icon := row[ICON_POS]:
254  data[LOGBOOK_ENTRY_ICON] = icon
255 
256  elif event_type in external_events:
257  domain, describe_event = external_events[event_type]
258  try:
259  data = describe_event(event_cache_get(row))
260  except Exception:
261  _LOGGER.exception(
262  "Error with %s describe event for %s", domain, event_type
263  )
264  continue
265  data[LOGBOOK_ENTRY_DOMAIN] = domain
266 
267  elif event_type == EVENT_LOGBOOK_ENTRY:
268  event = event_cache_get(row)
269  if not (event_data := event.data):
270  continue
271  entry_domain = event_data.get(ATTR_DOMAIN)
272  entry_entity_id = event_data.get(ATTR_ENTITY_ID)
273  if entry_domain is None and entry_entity_id is not None:
274  entry_domain = split_entity_id(str(entry_entity_id))[0]
275  data = {
276  LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME),
277  LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE),
278  LOGBOOK_ENTRY_DOMAIN: entry_domain,
279  LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id,
280  }
281 
282  else:
283  continue
284 
285  time_fired_ts = row[TIME_FIRED_TS_POS]
286  if timestamp:
287  when = time_fired_ts or time.time()
288  else:
290  dt_util.utc_from_timestamp(time_fired_ts) or dt_util.utcnow()
291  )
292  data[LOGBOOK_ENTRY_WHEN] = when
293 
294  if context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS]:
295  data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(context_user_id_bin)
296 
297  # Augment context if its available but not if the context is the same as the row
298  # or if the context is the parent of the row
299  if (context_row := get_context(context_id_bin, row)) and not (
300  (row is context_row or _rows_ids_match(row, context_row))
301  and (
302  not (context_parent := row[CONTEXT_PARENT_ID_BIN_POS])
303  or not (context_row := get_context(context_parent, context_row))
304  or row is context_row
305  or _rows_ids_match(row, context_row)
306  )
307  ):
308  context_augmenter.augment(data, context_row)
309 
310  yield data
311 
312 
314  """Augment data with context trace."""
315 
316  def __init__(self, logbook_run: LogbookRun) -> None:
317  """Init the augmenter."""
318  self.context_lookupcontext_lookup = logbook_run.context_lookup
319  self.entity_name_cacheentity_name_cache = logbook_run.entity_name_cache
320  self.external_eventsexternal_events = logbook_run.external_events
321  self.event_cacheevent_cache = logbook_run.event_cache
322  self.include_entity_nameinclude_entity_name = logbook_run.include_entity_name
323 
325  self, context_id_bin: bytes | None, row: Row | EventAsRow | None
326  ) -> Row | EventAsRow | None:
327  """Get the context row from the id or row context."""
328  if context_id_bin is not None and (
329  context_row := self.context_lookupcontext_lookup.get(context_id_bin)
330  ):
331  return context_row
332  if (
333  type(row) is EventAsRow
334  and (context := row[CONTEXT_POS]) is not None
335  and (origin_event := context.origin_event) is not None
336  ):
337  return async_event_to_row(origin_event)
338  return None
339 
340  def augment(self, data: dict[str, Any], context_row: Row | EventAsRow) -> None:
341  """Augment data from the row and cache."""
342  event_type = context_row[EVENT_TYPE_POS]
343  # State change
344  if context_entity_id := context_row[ENTITY_ID_POS]:
345  data[CONTEXT_STATE] = context_row[STATE_POS]
346  data[CONTEXT_ENTITY_ID] = context_entity_id
347  if self.include_entity_nameinclude_entity_name:
348  data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cacheentity_name_cache.get(
349  context_entity_id
350  )
351  return
352 
353  # Call service
354  if event_type == EVENT_CALL_SERVICE:
355  event = self.event_cacheevent_cache.get(context_row)
356  event_data = event.data
357  data[CONTEXT_DOMAIN] = event_data.get(ATTR_DOMAIN)
358  data[CONTEXT_SERVICE] = event_data.get(ATTR_SERVICE)
359  data[CONTEXT_EVENT_TYPE] = event_type
360  return
361 
362  if event_type not in self.external_eventsexternal_events:
363  return
364 
365  domain, describe_event = self.external_eventsexternal_events[event_type]
366  data[CONTEXT_EVENT_TYPE] = event_type
367  data[CONTEXT_DOMAIN] = domain
368  event = self.event_cacheevent_cache.get(context_row)
369  try:
370  described = describe_event(event)
371  except Exception:
372  _LOGGER.exception("Error with %s describe event for %s", domain, event_type)
373  return
374  if name := described.get(LOGBOOK_ENTRY_NAME):
375  data[CONTEXT_NAME] = name
376  if message := described.get(LOGBOOK_ENTRY_MESSAGE):
377  data[CONTEXT_MESSAGE] = message
378  # In 2022.12 and later drop `CONTEXT_MESSAGE` if `CONTEXT_SOURCE` is available
379  if source := described.get(LOGBOOK_ENTRY_SOURCE):
380  data[CONTEXT_SOURCE] = source
381  if not (attr_entity_id := described.get(LOGBOOK_ENTRY_ENTITY_ID)):
382  return
383  data[CONTEXT_ENTITY_ID] = attr_entity_id
384  if self.include_entity_nameinclude_entity_name:
385  data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cacheentity_name_cache.get(attr_entity_id)
386 
387 
388 def _rows_ids_match(row: Row | EventAsRow, other_row: Row | EventAsRow) -> bool:
389  """Check of rows match by using the same method as Events __hash__."""
390  return bool((row_id := row[ROW_ID_POS]) and row_id == other_row[ROW_ID_POS])
391 
392 
394  """A cache to lookup the name for an entity.
395 
396  This class should not be used to lookup attributes
397  that are expected to change state.
398  """
399 
400  def __init__(self, hass: HomeAssistant) -> None:
401  """Init the cache."""
402  self._hass_hass = hass
403  self._names: dict[str, str] = {}
404 
405  def get(self, entity_id: str) -> str:
406  """Lookup an the friendly name."""
407  if entity_id in self._names:
408  return self._names[entity_id]
409  if (current_state := self._hass_hass.states.get(entity_id)) and (
410  friendly_name := current_state.attributes.get(ATTR_FRIENDLY_NAME)
411  ):
412  self._names[entity_id] = friendly_name
413  else:
414  return split_entity_id(entity_id)[1].replace("_", " ")
415 
416  return self._names[entity_id]
417 
418 
420  """Cache LazyEventPartialState by row."""
421 
422  def __init__(self, event_data_cache: dict[str, dict[str, Any]]) -> None:
423  """Init the cache."""
424  self._event_data_cache_event_data_cache = event_data_cache
425  self.event_cacheevent_cache: dict[Row | EventAsRow, LazyEventPartialState] = {}
426 
427  def get(self, row: EventAsRow | Row) -> LazyEventPartialState:
428  """Get the event from the row."""
429  if type(row) is EventAsRow: # - this is never subclassed
430  return LazyEventPartialState(row, self._event_data_cache_event_data_cache)
431  if event := self.event_cacheevent_cache.get(row):
432  return event
433  self.event_cacheevent_cache[row] = lazy_event = LazyEventPartialState(
434  row, self._event_data_cache_event_data_cache
435  )
436  return lazy_event
437 
438  def clear(self) -> None:
439  """Clear the event cache."""
440  self._event_data_cache_event_data_cache = {}
441  self.event_cacheevent_cache = {}
None augment(self, dict[str, Any] data, Row|EventAsRow context_row)
Definition: processor.py:340
Row|EventAsRow|None get_context(self, bytes|None context_id_bin, Row|EventAsRow|None row)
Definition: processor.py:326
None __init__(self, dict[str, dict[str, Any]] event_data_cache)
Definition: processor.py:422
LazyEventPartialState get(self, EventAsRow|Row row)
Definition: processor.py:427
list[dict[str, str]] humanify(self, Generator[EventAsRow]|Sequence[Row]|Result rows)
Definition: processor.py:189
None __init__(self, HomeAssistant hass, tuple[EventType[Any]|str,...] event_types, list[str]|None entity_ids=None, list[str]|None device_ids=None, str|None context_id=None, bool timestamp=False, bool include_entity_name=True)
Definition: processor.py:116
list[dict[str, Any]] get_events(self, dt start_day, dt end_day)
Definition: processor.py:157
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool is_sensor_continuous(HomeAssistant hass, er.EntityRegistry ent_reg, str entity_id)
Definition: helpers.py:219
EventAsRow async_event_to_row(Event event)
Definition: models.py:141
Generator[dict[str, Any]] _humanify(HomeAssistant hass, Generator[EventAsRow]|Sequence[Row]|Result rows, er.EntityRegistry ent_reg, LogbookRun logbook_run, ContextAugmenter context_augmenter)
Definition: processor.py:208
bool _rows_ids_match(Row|EventAsRow row, Row|EventAsRow other_row)
Definition: processor.py:388
StatementLambdaElement statement_for_request(dt start_day_dt, dt end_day_dt, tuple[int,...] event_type_ids, list[str]|None entity_ids=None, Collection[int]|None states_metadata_ids=None, list[str]|None device_ids=None, Filters|None filters=None, str|None context_id=None)
Definition: __init__.py:29
str|None bytes_to_uuid_hex_or_none(bytes|None _bytes)
Definition: context.py:31
list[int] extract_event_type_ids(dict[EventType[Any]|str, int|None] event_type_to_event_type_id)
Definition: event.py:12
list[int] extract_metadata_ids(dict[str, int|None] entity_id_to_metadata_id)
Definition: state.py:30
None process_timestamp_to_utc_isoformat(None ts)
Definition: time.py:37
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86