1 """Event parser and human readable log generator."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Mapping
13 ATTR_UNIT_OF_MEASUREMENT,
20 EventStateChangedData,
31 from .const
import ALWAYS_CONTINUOUS_DOMAINS, AUTOMATION_EVENTS, BUILT_IN_EVENTS, DOMAIN
32 from .models
import LogbookConfig
36 """Filter out any entities that logbook will not produce results for."""
37 ent_reg = er.async_get(hass)
40 for entity_id
in entity_ids
48 hass: HomeAssistant, entity_ids: list[str] |
None, device_ids: list[str] |
None
50 """Find the config entry ids for a set of entities or devices."""
51 config_entry_ids: set[str] = set()
53 eng_reg = er.async_get(hass)
54 for entity_id
in entity_ids:
55 if (entry := eng_reg.async_get(entity_id))
and entry.config_entry_id:
56 config_entry_ids.add(entry.config_entry_id)
58 dev_reg = dr.async_get(hass)
59 for device_id
in device_ids:
60 if (device := dev_reg.async_get(device_id))
and device.config_entries:
61 config_entry_ids |= device.config_entries
62 return config_entry_ids
66 hass: HomeAssistant, entity_ids: list[str] |
None, device_ids: list[str] |
None
67 ) -> tuple[EventType[Any] | str, ...]:
68 """Reduce the event types based on the entity ids and device ids."""
69 logbook_config: LogbookConfig = hass.data[DOMAIN]
70 external_events = logbook_config.external_events
71 if not entity_ids
and not device_ids:
72 return (*BUILT_IN_EVENTS, *external_events)
74 interested_domains: set[str] = set()
76 if entry := hass.config_entries.async_get_entry(entry_id):
77 interested_domains.add(entry.domain)
85 intrested_event_types: set[EventType[Any] | str] = {
87 for external_event, domain_call
in external_events.items()
88 if domain_call[0]
in interested_domains
92 intrested_event_types.add(EVENT_LOGBOOK_ENTRY)
94 return tuple(intrested_event_types)
98 def extract_attr(source: Mapping[str, Any], attr: str) -> list[str]:
99 """Extract an attribute as a list or string."""
100 if (value := source.get(attr))
is None:
102 if isinstance(value, list):
104 return str(value).split(
",")
109 target: Callable[[Event],
None],
110 entities_filter: Callable[[str], bool] |
None,
111 entity_ids: list[str] |
None,
112 device_ids: list[str] |
None,
113 ) -> Callable[[Event],
None]:
114 """Make a callable to filter events."""
115 if not entities_filter
and not entity_ids
and not device_ids:
126 def _forward_events_filtered_by_entities_filter(event: Event) ->
None:
127 assert entities_filter
is not None
128 event_data = event.data
130 if entity_ids
and not any(
131 entities_filter(entity_id)
for entity_id
in entity_ids
134 domain = event_data.get(ATTR_DOMAIN)
135 if domain
and not entities_filter(f
"{domain}._"):
139 return _forward_events_filtered_by_entities_filter
145 entity_ids_set = set(entity_ids)
if entity_ids
else set()
146 device_ids_set = set(device_ids)
if device_ids
else set()
149 def _forward_events_filtered_by_device_entity_ids(event: Event) ->
None:
150 event_data = event.data
151 if entity_ids_set.intersection(
153 )
or device_ids_set.intersection(
extract_attr(event_data, ATTR_DEVICE_ID)):
156 return _forward_events_filtered_by_device_entity_ids
162 subscriptions: list[CALLBACK_TYPE],
163 target: Callable[[Event[Any]],
None],
164 event_types: tuple[EventType[Any] | str, ...],
165 entities_filter: Callable[[str], bool] |
None,
166 entity_ids: list[str] |
None,
167 device_ids: list[str] |
None,
169 """Subscribe to events for the entities and devices or all.
171 These are the events we need to listen for to do
172 the live logbook stream.
174 assert is_callback(target),
"target must be a callback"
176 target, entities_filter, entity_ids, device_ids
178 subscriptions.extend(
179 hass.bus.async_listen(event_type, event_forwarder)
for event_type
in event_types
182 if device_ids
and not entity_ids:
189 def _forward_state_events_filtered(event: Event[EventStateChangedData]) ->
None:
190 if (old_state := event.data[
"old_state"])
is None or (
191 new_state := event.data[
"new_state"]
195 entities_filter
and not entities_filter(new_state.entity_id)
201 subscriptions.append(
203 hass, entity_ids, _forward_state_events_filtered
209 subscriptions.append(
210 hass.bus.async_listen(
212 _forward_state_events_filtered,
218 hass: HomeAssistant, ent_reg: er.EntityRegistry, entity_id: str
220 """Determine if a sensor is continuous.
222 Sensors with a unit_of_measurement or state_class are considered continuous.
224 The unit_of_measurement check will already happen if this is
225 called for historical data because the SQL query generated by _get_events
226 will filter out any sensors with a unit_of_measurement.
228 If the state still exists in the state machine, this function still
229 checks for ATTR_UNIT_OF_MEASUREMENT since the live mode is not filtered
235 if (state := hass.states.get(entity_id))
and (attributes := state.attributes):
236 return ATTR_UNIT_OF_MEASUREMENT
in attributes
or ATTR_STATE_CLASS
in attributes
245 (entry := ent_reg.async_get(entity_id))
246 and entry.capabilities
247 and entry.capabilities.get(ATTR_STATE_CLASS)
252 """Check if the logbook should filter a state.
254 Used when we are in live mode to ensure
255 we only get significant changes (state.last_changed != state.last_updated)
258 new_state.state == old_state.state
259 or new_state.last_changed != new_state.last_updated
260 or new_state.domain
in ALWAYS_CONTINUOUS_DOMAINS
261 or ATTR_UNIT_OF_MEASUREMENT
in new_state.attributes
262 or ATTR_STATE_CLASS
in new_state.attributes
tuple[EventType[Any]|str,...] async_determine_event_types(HomeAssistant hass, list[str]|None entity_ids, list[str]|None device_ids)
list[str] async_filter_entities(HomeAssistant hass, list[str] entity_ids)
bool is_sensor_continuous(HomeAssistant hass, er.EntityRegistry ent_reg, str entity_id)
set[str] _async_config_entries_for_ids(HomeAssistant hass, list[str]|None entity_ids, list[str]|None device_ids)
list[str] extract_attr(Mapping[str, Any] source, str attr)
bool _is_state_filtered(State new_state, State old_state)
None async_subscribe_events(HomeAssistant hass, list[CALLBACK_TYPE] subscriptions, Callable[[Event[Any]], None] target, tuple[EventType[Any]|str,...] event_types, Callable[[str], bool]|None entities_filter, list[str]|None entity_ids, list[str]|None device_ids)
Callable[[Event], None] event_forwarder_filtered(Callable[[Event], None] target, Callable[[str], bool]|None entities_filter, list[str]|None entity_ids, list[str]|None device_ids)
tuple[str, str] split_entity_id(str entity_id)
bool is_callback(Callable[..., Any] func)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)