1 """Event parser and human readable log generator."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Generator, Sequence
6 from dataclasses
import dataclass
7 from datetime
import datetime
as dt
10 from typing
import TYPE_CHECKING, Any
12 from sqlalchemy.engine
import Result
13 from sqlalchemy.engine.row
import Row
18 bytes_to_uuid_hex_or_none,
19 extract_event_type_ids,
21 process_timestamp_to_utc_isoformat,
24 execute_stmt_lambda_element,
46 CONTEXT_ENTITY_ID_NAME,
56 LOGBOOK_ENTRY_ENTITY_ID,
58 LOGBOOK_ENTRY_MESSAGE,
64 from .helpers
import is_sensor_continuous
68 CONTEXT_PARENT_ID_BIN_POS,
70 CONTEXT_USER_ID_BIN_POS,
78 LazyEventPartialState,
82 from .queries
import statement_for_request
83 from .queries.common
import PSEUDO_EVENT_STATE_CHANGED
85 _LOGGER = logging.getLogger(__name__)
88 @dataclass(slots=True)
90 """A logbook run which may be a long running event stream or single request."""
92 context_lookup: dict[bytes |
None, Row | EventAsRow |
None]
93 external_events: dict[
95 tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]],
97 event_cache: EventCache
98 entity_name_cache: EntityNameCache
99 include_entity_name: bool
101 memoize_new_contexts: bool =
True
105 """Stream into logbook format."""
110 event_types: tuple[EventType[Any] | str, ...],
111 entity_ids: list[str] |
None =
None,
112 device_ids: list[str] |
None =
None,
113 context_id: str |
None =
None,
114 timestamp: bool =
False,
115 include_entity_name: bool =
True,
117 """Init the event stream."""
119 context_id
and (entity_ids
or device_ids)
120 ),
"can't pass in both context_id and (entity_ids or device_ids)"
127 logbook_config: LogbookConfig = hass.data[DOMAIN]
128 self.filters: Filters |
None = logbook_config.sqlalchemy_filter
130 context_lookup={
None:
None},
131 external_events=logbook_config.external_events,
134 include_entity_name=include_entity_name,
141 """Check if the stream is limited by entities context or device ids."""
145 """Switch to live stream.
147 Clear caches so we can reduce memory pressure.
151 self.
logbook_runlogbook_run.memoize_new_contexts =
False
157 ) -> list[dict[str, Any]]:
158 """Get events for a period of time."""
160 metadata_ids: list[int] |
None =
None
164 instance.states_meta_manager.get_many(
168 event_type_ids =
tuple(
170 instance.event_type_manager.get_many(self.
event_typesevent_types, session)
188 self, rows: Generator[EventAsRow] | Sequence[Row] | Result
189 ) -> list[dict[str, str]]:
204 rows: Generator[EventAsRow] | Sequence[Row] | Result,
205 ent_reg: er.EntityRegistry,
206 logbook_run: LogbookRun,
207 context_augmenter: ContextAugmenter,
208 ) -> Generator[dict[str, Any]]:
209 """Generate a converted list of events into entries."""
211 continuous_sensors: dict[str, bool] = {}
212 context_lookup = logbook_run.context_lookup
213 external_events = logbook_run.external_events
214 event_cache_get = logbook_run.event_cache.get
215 entity_name_cache_get = logbook_run.entity_name_cache.get
216 include_entity_name = logbook_run.include_entity_name
217 timestamp = logbook_run.timestamp
218 memoize_new_contexts = logbook_run.memoize_new_contexts
219 get_context = context_augmenter.get_context
220 context_id_bin: bytes
225 context_id_bin = row[CONTEXT_ID_BIN_POS]
226 if memoize_new_contexts
and context_id_bin
not in context_lookup:
227 context_lookup[context_id_bin] = row
228 if row[CONTEXT_ONLY_POS]:
230 event_type = row[EVENT_TYPE_POS]
231 if event_type == EVENT_CALL_SERVICE:
234 if event_type
is PSEUDO_EVENT_STATE_CHANGED:
235 entity_id = row[ENTITY_ID_POS]
237 assert entity_id
is not None
240 is_continuous := continuous_sensors.get(entity_id)
243 continuous_sensors[entity_id] = is_continuous
248 LOGBOOK_ENTRY_STATE: row[STATE_POS],
249 LOGBOOK_ENTRY_ENTITY_ID: entity_id,
251 if include_entity_name:
252 data[LOGBOOK_ENTRY_NAME] = entity_name_cache_get(entity_id)
253 if icon := row[ICON_POS]:
254 data[LOGBOOK_ENTRY_ICON] = icon
256 elif event_type
in external_events:
257 domain, describe_event = external_events[event_type]
259 data = describe_event(event_cache_get(row))
262 "Error with %s describe event for %s", domain, event_type
265 data[LOGBOOK_ENTRY_DOMAIN] = domain
267 elif event_type == EVENT_LOGBOOK_ENTRY:
268 event = event_cache_get(row)
269 if not (event_data := event.data):
271 entry_domain = event_data.get(ATTR_DOMAIN)
272 entry_entity_id = event_data.get(ATTR_ENTITY_ID)
273 if entry_domain
is None and entry_entity_id
is not None:
276 LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME),
277 LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE),
278 LOGBOOK_ENTRY_DOMAIN: entry_domain,
279 LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id,
285 time_fired_ts = row[TIME_FIRED_TS_POS]
287 when = time_fired_ts
or time.time()
290 dt_util.utc_from_timestamp(time_fired_ts)
or dt_util.utcnow()
292 data[LOGBOOK_ENTRY_WHEN] = when
294 if context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS]:
299 if (context_row := get_context(context_id_bin, row))
and not (
302 not (context_parent := row[CONTEXT_PARENT_ID_BIN_POS])
303 or not (context_row := get_context(context_parent, context_row))
304 or row
is context_row
308 context_augmenter.augment(data, context_row)
314 """Augment data with context trace."""
316 def __init__(self, logbook_run: LogbookRun) ->
None:
317 """Init the augmenter."""
325 self, context_id_bin: bytes |
None, row: Row | EventAsRow |
None
326 ) -> Row | EventAsRow |
None:
327 """Get the context row from the id or row context."""
328 if context_id_bin
is not None and (
333 type(row)
is EventAsRow
334 and (context := row[CONTEXT_POS])
is not None
335 and (origin_event := context.origin_event)
is not None
340 def augment(self, data: dict[str, Any], context_row: Row | EventAsRow) ->
None:
341 """Augment data from the row and cache."""
342 event_type = context_row[EVENT_TYPE_POS]
344 if context_entity_id := context_row[ENTITY_ID_POS]:
345 data[CONTEXT_STATE] = context_row[STATE_POS]
346 data[CONTEXT_ENTITY_ID] = context_entity_id
354 if event_type == EVENT_CALL_SERVICE:
356 event_data = event.data
357 data[CONTEXT_DOMAIN] = event_data.get(ATTR_DOMAIN)
358 data[CONTEXT_SERVICE] = event_data.get(ATTR_SERVICE)
359 data[CONTEXT_EVENT_TYPE] = event_type
365 domain, describe_event = self.
external_eventsexternal_events[event_type]
366 data[CONTEXT_EVENT_TYPE] = event_type
367 data[CONTEXT_DOMAIN] = domain
370 described = describe_event(event)
372 _LOGGER.exception(
"Error with %s describe event for %s", domain, event_type)
374 if name := described.get(LOGBOOK_ENTRY_NAME):
375 data[CONTEXT_NAME] = name
376 if message := described.get(LOGBOOK_ENTRY_MESSAGE):
377 data[CONTEXT_MESSAGE] = message
379 if source := described.get(LOGBOOK_ENTRY_SOURCE):
380 data[CONTEXT_SOURCE] = source
381 if not (attr_entity_id := described.get(LOGBOOK_ENTRY_ENTITY_ID)):
383 data[CONTEXT_ENTITY_ID] = attr_entity_id
389 """Check of rows match by using the same method as Events __hash__."""
390 return bool((row_id := row[ROW_ID_POS])
and row_id == other_row[ROW_ID_POS])
394 """A cache to lookup the name for an entity.
396 This class should not be used to lookup attributes
397 that are expected to change state.
401 """Init the cache."""
403 self._names: dict[str, str] = {}
405 def get(self, entity_id: str) -> str:
406 """Lookup an the friendly name."""
407 if entity_id
in self._names:
408 return self._names[entity_id]
409 if (current_state := self.
_hass_hass.states.get(entity_id))
and (
410 friendly_name := current_state.attributes.get(ATTR_FRIENDLY_NAME)
412 self._names[entity_id] = friendly_name
416 return self._names[entity_id]
420 """Cache LazyEventPartialState by row."""
422 def __init__(self, event_data_cache: dict[str, dict[str, Any]]) ->
None:
423 """Init the cache."""
425 self.
event_cacheevent_cache: dict[Row | EventAsRow, LazyEventPartialState] = {}
427 def get(self, row: EventAsRow | Row) -> LazyEventPartialState:
428 """Get the event from the row."""
429 if type(row)
is EventAsRow:
439 """Clear the event cache."""
None __init__(self, LogbookRun logbook_run)
None augment(self, dict[str, Any] data, Row|EventAsRow context_row)
Row|EventAsRow|None get_context(self, bytes|None context_id_bin, Row|EventAsRow|None row)
str get(self, str entity_id)
None __init__(self, HomeAssistant hass)
None __init__(self, dict[str, dict[str, Any]] event_data_cache)
LazyEventPartialState get(self, EventAsRow|Row row)
list[dict[str, str]] humanify(self, Generator[EventAsRow]|Sequence[Row]|Result rows)
bool limited_select(self)
None __init__(self, HomeAssistant hass, tuple[EventType[Any]|str,...] event_types, list[str]|None entity_ids=None, list[str]|None device_ids=None, str|None context_id=None, bool timestamp=False, bool include_entity_name=True)
list[dict[str, Any]] get_events(self, dt start_day, dt end_day)
None switch_to_live(self)
web.Response get(self, web.Request request, str config_key)
bool is_sensor_continuous(HomeAssistant hass, er.EntityRegistry ent_reg, str entity_id)
EventAsRow async_event_to_row(Event event)
Generator[dict[str, Any]] _humanify(HomeAssistant hass, Generator[EventAsRow]|Sequence[Row]|Result rows, er.EntityRegistry ent_reg, LogbookRun logbook_run, ContextAugmenter context_augmenter)
bool _rows_ids_match(Row|EventAsRow row, Row|EventAsRow other_row)
StatementLambdaElement statement_for_request(dt start_day_dt, dt end_day_dt, tuple[int,...] event_type_ids, list[str]|None entity_ids=None, Collection[int]|None states_metadata_ids=None, list[str]|None device_ids=None, Filters|None filters=None, str|None context_id=None)
str|None bytes_to_uuid_hex_or_none(bytes|None _bytes)
list[int] extract_event_type_ids(dict[EventType[Any]|str, int|None] event_type_to_event_type_id)
list[int] extract_metadata_ids(dict[str, int|None] entity_id_to_metadata_id)
None process_timestamp_to_utc_isoformat(None ts)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
tuple[str, str] split_entity_id(str entity_id)
Recorder get_instance(HomeAssistant hass)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)