1 """Event parser and human readable log generator."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from dataclasses
import dataclass
8 from datetime
import datetime
as dt, timedelta
10 from typing
import Any
12 import voluptuous
as vol
23 from .const
import DOMAIN
24 from .helpers
import (
25 async_determine_event_types,
26 async_filter_entities,
27 async_subscribe_events,
29 from .models
import LogbookConfig, async_event_to_row
30 from .processor
import EventProcessor
32 MAX_PENDING_LOGBOOK_EVENTS = 2048
33 EVENT_COALESCE_TIME = 0.35
37 BIG_QUERY_RECENT_HOURS = 24
39 _LOGGER = logging.getLogger(__name__)
42 @dataclass(slots=True)
44 """Track a logbook live stream."""
46 stream_queue: asyncio.Queue[Event]
47 subscriptions: list[CALLBACK_TYPE]
48 end_time_unsub: CALLBACK_TYPE |
None =
None
49 task: asyncio.Task |
None =
None
50 wait_sync_task: asyncio.Task |
None =
None
55 """Set up the logbook websocket API."""
56 websocket_api.async_register_command(hass, ws_get_events)
57 websocket_api.async_register_command(hass, ws_event_stream)
62 connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt |
None
64 """Send an empty response.
66 The current case for this is when they ask for entity_ids
67 that will all be filtered away because they have UOMs or
70 connection.send_result(msg_id)
71 stream_end_time = end_time
or dt_util.utcnow()
73 empty_response = messages.event_message(msg_id, empty_stream_message)
74 connection.send_message(
json_bytes(empty_response))
79 connection: ActiveConnection,
83 event_processor: EventProcessor,
85 force_send: bool =
False,
87 """Select historical data from the database and deliver it to the websocket.
89 If the query is considered a big query we will split the request into
90 two chunks so that they get the recent events first and the select
91 that is expected to take a long time comes in after to ensure
92 they are not stuck at a loading screen and can start looking at
95 This function returns the time of the most recent event we sent to the
99 not event_processor.entity_ids
100 and not event_processor.device_ids
101 and ((end_time - start_time) >
timedelta(hours=BIG_QUERY_HOURS))
118 if last_event_time
or not partial
or force_send:
119 connection.send_message(message)
120 return last_event_time
125 recent_query_start = end_time -
timedelta(hours=BIG_QUERY_RECENT_HOURS)
134 if recent_query_last_event_time:
135 connection.send_message(recent_message)
150 if older_query_last_event_time
or not partial
or force_send:
151 connection.send_message(older_message)
154 return recent_query_last_event_time
or older_query_last_event_time
162 event_processor: EventProcessor,
164 ) -> tuple[bytes, dt |
None]:
165 """Async wrapper around _ws_formatted_get_events."""
167 _ws_stream_get_events,
177 events: list[dict[str, Any]], start_day: dt, end_day: dt
179 """Generate a logbook stream message response."""
182 "start_time": start_day.timestamp(),
183 "end_time": end_day.timestamp(),
191 event_processor: EventProcessor,
193 ) -> tuple[bytes, dt |
None]:
194 """Fetch events and convert them to json in the executor."""
195 events = event_processor.get_events(start_day, end_day)
198 last_time = dt_util.utc_from_timestamp(events[-1][
"when"])
205 message[
"partial"] =
True
206 return json_bytes(messages.event_message(msg_id, message)), last_time
210 subscriptions_setup_complete_time: dt,
211 connection: ActiveConnection,
213 stream_queue: asyncio.Queue[Event],
214 event_processor: EventProcessor,
216 """Stream events from the queue."""
217 subscriptions_setup_complete_timestamp = (
218 subscriptions_setup_complete_time.timestamp()
221 events: list[Event] = [await stream_queue.get()]
224 if events[0].time_fired_timestamp <= subscriptions_setup_complete_timestamp:
230 await asyncio.sleep(EVENT_COALESCE_TIME)
231 while not stream_queue.empty():
232 events.append(stream_queue.get_nowait())
234 if logbook_events := event_processor.humanify(
237 connection.send_message(
239 messages.event_message(
241 {
"events": logbook_events},
247 @websocket_api.websocket_command(
{
vol.Required("type"):
"logbook/event_stream",
248 vol.Required(
"start_time"): str,
249 vol.Optional(
"end_time"): str,
250 vol.Optional(
"entity_ids"): [str],
251 vol.Optional(
"device_ids"): [str],
254 @websocket_api.async_response
256 hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
258 """Handle logbook stream events websocket command."""
259 start_time_str = msg[
"start_time"]
260 msg_id: int = msg[
"id"]
261 utc_now = dt_util.utcnow()
263 if start_time := dt_util.parse_datetime(start_time_str):
264 start_time = dt_util.as_utc(start_time)
266 if not start_time
or start_time > utc_now:
267 connection.send_error(msg_id,
"invalid_start_time",
"Invalid start_time")
270 end_time_str = msg.get(
"end_time")
271 end_time: dt |
None =
None
273 if not (end_time := dt_util.parse_datetime(end_time_str)):
274 connection.send_error(msg_id,
"invalid_end_time",
"Invalid end_time")
276 end_time = dt_util.as_utc(end_time)
277 if end_time < start_time:
278 connection.send_error(msg_id,
"invalid_end_time",
"Invalid end_time")
281 device_ids = msg.get(
"device_ids")
282 entity_ids = msg.get(
"entity_ids")
285 if not entity_ids
and not device_ids:
297 include_entity_name=
False,
300 if end_time
and end_time <= utc_now:
302 connection.subscriptions[msg_id] = callback(
lambda:
None)
303 connection.send_result(msg_id)
316 subscriptions: list[CALLBACK_TYPE] = []
317 stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS)
319 subscriptions=subscriptions, stream_queue=stream_queue
323 def _unsub(*time: Any) ->
None:
324 """Unsubscribe from all events."""
325 for subscription
in subscriptions:
327 subscriptions.clear()
329 live_stream.task.cancel()
330 if live_stream.wait_sync_task:
331 live_stream.wait_sync_task.cancel()
332 if live_stream.end_time_unsub:
333 live_stream.end_time_unsub()
334 live_stream.end_time_unsub =
None
338 hass, _unsub, end_time
342 def _queue_or_cancel(event: Event) ->
None:
343 """Queue an event to be processed or cancel."""
345 stream_queue.put_nowait(event)
346 except asyncio.QueueFull:
348 "Client exceeded max pending messages of %s",
349 MAX_PENDING_LOGBOOK_EVENTS,
353 entities_filter: Callable[[str], bool] |
None =
None
354 if not event_processor.limited_select:
355 logbook_config: LogbookConfig = hass.data[DOMAIN]
356 entities_filter = logbook_config.entity_filter
367 subscriptions_setup_complete_time = dt_util.utcnow()
368 connection.subscriptions[msg_id] = _unsub
369 connection.send_result(msg_id)
376 subscriptions_setup_complete_time,
386 if msg_id
not in connection.subscriptions:
390 live_stream.task = create_eager_task(
392 subscriptions_setup_complete_time,
400 live_stream.wait_sync_task = create_eager_task(
403 await live_stream.wait_sync_task
420 (last_event_time
or start_time) +
timedelta(microseconds=1),
421 subscriptions_setup_complete_time,
425 event_processor.switch_to_live()
432 event_processor: EventProcessor,
434 """Fetch events and convert them to json in the executor."""
436 messages.result_message(
437 msg_id, event_processor.get_events(start_time, end_time)
442 @websocket_api.websocket_command(
{
vol.Required("type"):
"logbook/get_events",
443 vol.Required(
"start_time"): str,
444 vol.Optional(
"end_time"): str,
445 vol.Optional(
"entity_ids"): [str],
446 vol.Optional(
"device_ids"): [str],
447 vol.Optional(
"context_id"): str,
450 @websocket_api.async_response
452 hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
454 """Handle logbook get events websocket command."""
455 start_time_str = msg[
"start_time"]
456 end_time_str = msg.get(
"end_time")
457 utc_now = dt_util.utcnow()
459 if start_time := dt_util.parse_datetime(start_time_str):
460 start_time = dt_util.as_utc(start_time)
462 connection.send_error(msg[
"id"],
"invalid_start_time",
"Invalid start_time")
467 elif parsed_end_time := dt_util.parse_datetime(end_time_str):
468 end_time = dt_util.as_utc(parsed_end_time)
470 connection.send_error(msg[
"id"],
"invalid_end_time",
"Invalid end_time")
473 if start_time > utc_now:
474 connection.send_result(msg[
"id"], [])
477 device_ids = msg.get(
"device_ids")
478 entity_ids = msg.get(
"entity_ids")
479 context_id = msg.get(
"context_id")
482 if not entity_ids
and not device_ids:
484 connection.send_result(msg[
"id"], [])
496 include_entity_name=
False,
499 connection.send_message(
501 _ws_formatted_get_events,
508
tuple[EventType[Any]|str,...] async_determine_event_types(HomeAssistant hass, list[str]|None entity_ids, list[str]|None device_ids)
list[str] async_filter_entities(HomeAssistant hass, list[str] entity_ids)
None async_subscribe_events(HomeAssistant hass, list[CALLBACK_TYPE] subscriptions, Callable[[Event[Any]], None] target, tuple[EventType[Any]|str,...] event_types, Callable[[str], bool]|None entities_filter, list[str]|None entity_ids, list[str]|None device_ids)
EventAsRow async_event_to_row(Event event)
dt|None _async_send_historical_events(HomeAssistant hass, ActiveConnection connection, int msg_id, dt start_time, dt end_time, EventProcessor event_processor, bool partial, bool force_send=False)
None ws_event_stream(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_formatted_get_events(int msg_id, dt start_time, dt end_time, EventProcessor event_processor)
None ws_get_events(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
tuple[bytes, dt|None] _ws_stream_get_events(int msg_id, dt start_day, dt end_day, EventProcessor event_processor, bool partial)
None _async_events_consumer(dt subscriptions_setup_complete_time, ActiveConnection connection, int msg_id, asyncio.Queue[Event] stream_queue, EventProcessor event_processor)
None async_setup(HomeAssistant hass)
dict[str, Any] _generate_stream_message(list[dict[str, Any]] events, dt start_day, dt end_day)
None _async_send_empty_response(ActiveConnection connection, int msg_id, dt start_time, dt|None end_time)
tuple[bytes, dt|None] _async_get_ws_stream_events(HomeAssistant hass, int msg_id, dt start_time, dt end_time, EventProcessor event_processor, bool partial)
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Recorder get_instance(HomeAssistant hass)