Home Assistant Unofficial Reference 2024.12.1
websocket_api.py
Go to the documentation of this file.
1 """Event parser and human readable log generator."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 from datetime import datetime as dt, timedelta
9 import logging
10 from typing import Any
11 
12 import voluptuous as vol
13 
14 from homeassistant.components import websocket_api
15 from homeassistant.components.recorder import get_instance
16 from homeassistant.components.websocket_api import ActiveConnection, messages
17 from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
18 from homeassistant.helpers.event import async_track_point_in_utc_time
19 from homeassistant.helpers.json import json_bytes
20 from homeassistant.util.async_ import create_eager_task
21 import homeassistant.util.dt as dt_util
22 
23 from .const import DOMAIN
24 from .helpers import (
25  async_determine_event_types,
26  async_filter_entities,
27  async_subscribe_events,
28 )
29 from .models import LogbookConfig, async_event_to_row
30 from .processor import EventProcessor
31 
32 MAX_PENDING_LOGBOOK_EVENTS = 2048
33 EVENT_COALESCE_TIME = 0.35
34 # minimum size that we will split the query
35 BIG_QUERY_HOURS = 25
36 # how many hours to deliver in the first chunk when we split the query
37 BIG_QUERY_RECENT_HOURS = 24
38 
39 _LOGGER = logging.getLogger(__name__)
40 
41 
42 @dataclass(slots=True)
44  """Track a logbook live stream."""
45 
46  stream_queue: asyncio.Queue[Event]
47  subscriptions: list[CALLBACK_TYPE]
48  end_time_unsub: CALLBACK_TYPE | None = None
49  task: asyncio.Task | None = None
50  wait_sync_task: asyncio.Task | None = None
51 
52 
53 @callback
54 def async_setup(hass: HomeAssistant) -> None:
55  """Set up the logbook websocket API."""
56  websocket_api.async_register_command(hass, ws_get_events)
57  websocket_api.async_register_command(hass, ws_event_stream)
58 
59 
60 @callback
62  connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None
63 ) -> None:
64  """Send an empty response.
65 
66  The current case for this is when they ask for entity_ids
67  that will all be filtered away because they have UOMs or
68  state_class.
69  """
70  connection.send_result(msg_id)
71  stream_end_time = end_time or dt_util.utcnow()
72  empty_stream_message = _generate_stream_message([], start_time, stream_end_time)
73  empty_response = messages.event_message(msg_id, empty_stream_message)
74  connection.send_message(json_bytes(empty_response))
75 
76 
78  hass: HomeAssistant,
79  connection: ActiveConnection,
80  msg_id: int,
81  start_time: dt,
82  end_time: dt,
83  event_processor: EventProcessor,
84  partial: bool,
85  force_send: bool = False,
86 ) -> dt | None:
87  """Select historical data from the database and deliver it to the websocket.
88 
89  If the query is considered a big query we will split the request into
90  two chunks so that they get the recent events first and the select
91  that is expected to take a long time comes in after to ensure
92  they are not stuck at a loading screen and can start looking at
93  the data right away.
94 
95  This function returns the time of the most recent event we sent to the
96  websocket.
97  """
98  is_big_query = (
99  not event_processor.entity_ids
100  and not event_processor.device_ids
101  and ((end_time - start_time) > timedelta(hours=BIG_QUERY_HOURS))
102  )
103 
104  if not is_big_query:
105  message, last_event_time = await _async_get_ws_stream_events(
106  hass,
107  msg_id,
108  start_time,
109  end_time,
110  event_processor,
111  partial,
112  )
113  # If there is no last_event_time, there are no historical
114  # results, but we still send an empty message
115  # if its the last one (not partial) so
116  # consumers of the api know their request was
117  # answered but there were no results
118  if last_event_time or not partial or force_send:
119  connection.send_message(message)
120  return last_event_time
121 
122  # This is a big query so we deliver
123  # the first three hours and then
124  # we fetch the old data
125  recent_query_start = end_time - timedelta(hours=BIG_QUERY_RECENT_HOURS)
126  recent_message, recent_query_last_event_time = await _async_get_ws_stream_events(
127  hass,
128  msg_id,
129  recent_query_start,
130  end_time,
131  event_processor,
132  partial=True,
133  )
134  if recent_query_last_event_time:
135  connection.send_message(recent_message)
136 
137  older_message, older_query_last_event_time = await _async_get_ws_stream_events(
138  hass,
139  msg_id,
140  start_time,
141  recent_query_start,
142  event_processor,
143  partial,
144  )
145  # If there is no last_event_time, there are no historical
146  # results, but we still send an empty message
147  # if its the last one (not partial) so
148  # consumers of the api know their request was
149  # answered but there were no results
150  if older_query_last_event_time or not partial or force_send:
151  connection.send_message(older_message)
152 
153  # Returns the time of the newest event
154  return recent_query_last_event_time or older_query_last_event_time
155 
156 
158  hass: HomeAssistant,
159  msg_id: int,
160  start_time: dt,
161  end_time: dt,
162  event_processor: EventProcessor,
163  partial: bool,
164 ) -> tuple[bytes, dt | None]:
165  """Async wrapper around _ws_formatted_get_events."""
166  return await get_instance(hass).async_add_executor_job(
167  _ws_stream_get_events,
168  msg_id,
169  start_time,
170  end_time,
171  event_processor,
172  partial,
173  )
174 
175 
177  events: list[dict[str, Any]], start_day: dt, end_day: dt
178 ) -> dict[str, Any]:
179  """Generate a logbook stream message response."""
180  return {
181  "events": events,
182  "start_time": start_day.timestamp(),
183  "end_time": end_day.timestamp(),
184  }
185 
186 
188  msg_id: int,
189  start_day: dt,
190  end_day: dt,
191  event_processor: EventProcessor,
192  partial: bool,
193 ) -> tuple[bytes, dt | None]:
194  """Fetch events and convert them to json in the executor."""
195  events = event_processor.get_events(start_day, end_day)
196  last_time = None
197  if events:
198  last_time = dt_util.utc_from_timestamp(events[-1]["when"])
199  message = _generate_stream_message(events, start_day, end_day)
200  if partial:
201  # This is a hint to consumers of the api that
202  # we are about to send a another block of historical
203  # data in case the UI needs to show that historical
204  # data is still loading in the future
205  message["partial"] = True
206  return json_bytes(messages.event_message(msg_id, message)), last_time
207 
208 
210  subscriptions_setup_complete_time: dt,
211  connection: ActiveConnection,
212  msg_id: int,
213  stream_queue: asyncio.Queue[Event],
214  event_processor: EventProcessor,
215 ) -> None:
216  """Stream events from the queue."""
217  subscriptions_setup_complete_timestamp = (
218  subscriptions_setup_complete_time.timestamp()
219  )
220  while True:
221  events: list[Event] = [await stream_queue.get()]
222  # If the event is older than the last db
223  # event we already sent it so we skip it.
224  if events[0].time_fired_timestamp <= subscriptions_setup_complete_timestamp:
225  continue
226  # We sleep for the EVENT_COALESCE_TIME so
227  # we can group events together to minimize
228  # the number of websocket messages when the
229  # system is overloaded with an event storm
230  await asyncio.sleep(EVENT_COALESCE_TIME)
231  while not stream_queue.empty():
232  events.append(stream_queue.get_nowait())
233 
234  if logbook_events := event_processor.humanify(
235  async_event_to_row(e) for e in events
236  ):
237  connection.send_message(
238  json_bytes(
239  messages.event_message(
240  msg_id,
241  {"events": logbook_events},
242  )
243  )
244  )
245 
246 
247 @websocket_api.websocket_command( { vol.Required("type"): "logbook/event_stream",
248  vol.Required("start_time"): str,
249  vol.Optional("end_time"): str,
250  vol.Optional("entity_ids"): [str],
251  vol.Optional("device_ids"): [str],
252  }
253 )
254 @websocket_api.async_response
255 async def ws_event_stream(
256  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
257 ) -> None:
258  """Handle logbook stream events websocket command."""
259  start_time_str = msg["start_time"]
260  msg_id: int = msg["id"]
261  utc_now = dt_util.utcnow()
262 
263  if start_time := dt_util.parse_datetime(start_time_str):
264  start_time = dt_util.as_utc(start_time)
265 
266  if not start_time or start_time > utc_now:
267  connection.send_error(msg_id, "invalid_start_time", "Invalid start_time")
268  return
269 
270  end_time_str = msg.get("end_time")
271  end_time: dt | None = None
272  if end_time_str:
273  if not (end_time := dt_util.parse_datetime(end_time_str)):
274  connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
275  return
276  end_time = dt_util.as_utc(end_time)
277  if end_time < start_time:
278  connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
279  return
280 
281  device_ids = msg.get("device_ids")
282  entity_ids = msg.get("entity_ids")
283  if entity_ids:
284  entity_ids = async_filter_entities(hass, entity_ids)
285  if not entity_ids and not device_ids:
286  _async_send_empty_response(connection, msg_id, start_time, end_time)
287  return
288 
289  event_types = async_determine_event_types(hass, entity_ids, device_ids)
290  event_processor = EventProcessor(
291  hass,
292  event_types,
293  entity_ids,
294  device_ids,
295  None,
296  timestamp=True,
297  include_entity_name=False,
298  )
299 
300  if end_time and end_time <= utc_now:
301  # Not live stream but we it might be a big query
302  connection.subscriptions[msg_id] = callback(lambda: None)
303  connection.send_result(msg_id)
304  # Fetch everything from history
306  hass,
307  connection,
308  msg_id,
309  start_time,
310  end_time,
311  event_processor,
312  partial=False,
313  )
314  return
315 
316  subscriptions: list[CALLBACK_TYPE] = []
317  stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS)
318  live_stream = LogbookLiveStream(
319  subscriptions=subscriptions, stream_queue=stream_queue
320  )
321 
322  @callback
323  def _unsub(*time: Any) -> None:
324  """Unsubscribe from all events."""
325  for subscription in subscriptions:
326  subscription()
327  subscriptions.clear()
328  if live_stream.task:
329  live_stream.task.cancel()
330  if live_stream.wait_sync_task:
331  live_stream.wait_sync_task.cancel()
332  if live_stream.end_time_unsub:
333  live_stream.end_time_unsub()
334  live_stream.end_time_unsub = None
335 
336  if end_time:
337  live_stream.end_time_unsub = async_track_point_in_utc_time(
338  hass, _unsub, end_time
339  )
340 
341  @callback
342  def _queue_or_cancel(event: Event) -> None:
343  """Queue an event to be processed or cancel."""
344  try:
345  stream_queue.put_nowait(event)
346  except asyncio.QueueFull:
347  _LOGGER.debug(
348  "Client exceeded max pending messages of %s",
349  MAX_PENDING_LOGBOOK_EVENTS,
350  )
351  _unsub()
352 
353  entities_filter: Callable[[str], bool] | None = None
354  if not event_processor.limited_select:
355  logbook_config: LogbookConfig = hass.data[DOMAIN]
356  entities_filter = logbook_config.entity_filter
357 
359  hass,
360  subscriptions,
361  _queue_or_cancel,
362  event_types,
363  entities_filter,
364  entity_ids,
365  device_ids,
366  )
367  subscriptions_setup_complete_time = dt_util.utcnow()
368  connection.subscriptions[msg_id] = _unsub
369  connection.send_result(msg_id)
370  # Fetch everything from history
371  last_event_time = await _async_send_historical_events(
372  hass,
373  connection,
374  msg_id,
375  start_time,
376  subscriptions_setup_complete_time,
377  event_processor,
378  partial=True,
379  # Force a send since the wait for the sync task
380  # can take a a while if the recorder is busy and
381  # we want to make sure the client is not still spinning
382  # because it is waiting for the first message
383  force_send=True,
384  )
385 
386  if msg_id not in connection.subscriptions:
387  # Unsubscribe happened while sending historical events
388  return
389 
390  live_stream.task = create_eager_task(
392  subscriptions_setup_complete_time,
393  connection,
394  msg_id,
395  stream_queue,
396  event_processor,
397  )
398  )
399 
400  live_stream.wait_sync_task = create_eager_task(
401  get_instance(hass).async_block_till_done()
402  )
403  await live_stream.wait_sync_task
404 
405  #
406  # Fetch any events from the database that have
407  # not been committed since the original fetch
408  # so we can switch over to using the subscriptions
409  #
410  # We only want events that happened after the last event
411  # we had from the last database query
412  #
414  hass,
415  connection,
416  msg_id,
417  # Add one microsecond so we are outside the window of
418  # the last event we got from the database since otherwise
419  # we could fetch the same event twice
420  (last_event_time or start_time) + timedelta(microseconds=1),
421  subscriptions_setup_complete_time,
422  event_processor,
423  partial=False,
424  )
425  event_processor.switch_to_live()
426 
427 
429  msg_id: int,
430  start_time: dt,
431  end_time: dt,
432  event_processor: EventProcessor,
433 ) -> bytes:
434  """Fetch events and convert them to json in the executor."""
435  return json_bytes(
436  messages.result_message(
437  msg_id, event_processor.get_events(start_time, end_time)
438  )
439  )
440 
441 
442 @websocket_api.websocket_command( { vol.Required("type"): "logbook/get_events",
443  vol.Required("start_time"): str,
444  vol.Optional("end_time"): str,
445  vol.Optional("entity_ids"): [str],
446  vol.Optional("device_ids"): [str],
447  vol.Optional("context_id"): str,
448  }
449 )
450 @websocket_api.async_response
451 async def ws_get_events(
452  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
453 ) -> None:
454  """Handle logbook get events websocket command."""
455  start_time_str = msg["start_time"]
456  end_time_str = msg.get("end_time")
457  utc_now = dt_util.utcnow()
458 
459  if start_time := dt_util.parse_datetime(start_time_str):
460  start_time = dt_util.as_utc(start_time)
461  else:
462  connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
463  return
464 
465  if not end_time_str:
466  end_time = utc_now
467  elif parsed_end_time := dt_util.parse_datetime(end_time_str):
468  end_time = dt_util.as_utc(parsed_end_time)
469  else:
470  connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
471  return
472 
473  if start_time > utc_now:
474  connection.send_result(msg["id"], [])
475  return
476 
477  device_ids = msg.get("device_ids")
478  entity_ids = msg.get("entity_ids")
479  context_id = msg.get("context_id")
480  if entity_ids:
481  entity_ids = async_filter_entities(hass, entity_ids)
482  if not entity_ids and not device_ids:
483  # Everything has been filtered away
484  connection.send_result(msg["id"], [])
485  return
486 
487  event_types = async_determine_event_types(hass, entity_ids, device_ids)
488 
489  event_processor = EventProcessor(
490  hass,
491  event_types,
492  entity_ids,
493  device_ids,
494  context_id,
495  timestamp=True,
496  include_entity_name=False,
497  )
498 
499  connection.send_message(
500  await get_instance(hass).async_add_executor_job(
501  _ws_formatted_get_events,
502  msg["id"],
503  start_time,
504  end_time,
505  event_processor,
506  )
507  )
508 
tuple[EventType[Any]|str,...] async_determine_event_types(HomeAssistant hass, list[str]|None entity_ids, list[str]|None device_ids)
Definition: helpers.py:67
list[str] async_filter_entities(HomeAssistant hass, list[str] entity_ids)
Definition: helpers.py:35
None async_subscribe_events(HomeAssistant hass, list[CALLBACK_TYPE] subscriptions, Callable[[Event[Any]], None] target, tuple[EventType[Any]|str,...] event_types, Callable[[str], bool]|None entities_filter, list[str]|None entity_ids, list[str]|None device_ids)
Definition: helpers.py:168
EventAsRow async_event_to_row(Event event)
Definition: models.py:141
dt|None _async_send_historical_events(HomeAssistant hass, ActiveConnection connection, int msg_id, dt start_time, dt end_time, EventProcessor event_processor, bool partial, bool force_send=False)
None ws_event_stream(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_formatted_get_events(int msg_id, dt start_time, dt end_time, EventProcessor event_processor)
None ws_get_events(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
tuple[bytes, dt|None] _ws_stream_get_events(int msg_id, dt start_day, dt end_day, EventProcessor event_processor, bool partial)
None _async_events_consumer(dt subscriptions_setup_complete_time, ActiveConnection connection, int msg_id, asyncio.Queue[Event] stream_queue, EventProcessor event_processor)
dict[str, Any] _generate_stream_message(list[dict[str, Any]] events, dt start_day, dt end_day)
None _async_send_empty_response(ActiveConnection connection, int msg_id, dt start_time, dt|None end_time)
tuple[bytes, dt|None] _async_get_ws_stream_events(HomeAssistant hass, int msg_id, dt start_time, dt end_time, EventProcessor event_processor, bool partial)
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Definition: event.py:1542
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74