Home Assistant Unofficial Reference 2024.12.1
websocket_api.py
Go to the documentation of this file.
1 """Websocket API for the history integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Iterable
7 from dataclasses import dataclass
8 from datetime import datetime as dt, timedelta
9 import logging
10 from typing import Any, cast
11 
12 import voluptuous as vol
13 
14 from homeassistant.components import websocket_api
15 from homeassistant.components.recorder import get_instance, history
16 from homeassistant.components.websocket_api import ActiveConnection, messages
17 from homeassistant.const import (
18  COMPRESSED_STATE_ATTRIBUTES,
19  COMPRESSED_STATE_LAST_CHANGED,
20  COMPRESSED_STATE_LAST_UPDATED,
21  COMPRESSED_STATE_STATE,
22 )
23 from homeassistant.core import (
24  CALLBACK_TYPE,
25  Event,
26  EventStateChangedData,
27  HomeAssistant,
28  State,
29  callback,
30  is_callback,
31  valid_entity_id,
32 )
33 from homeassistant.helpers.event import (
34  async_track_point_in_utc_time,
35  async_track_state_change_event,
36 )
37 from homeassistant.helpers.json import json_bytes
38 from homeassistant.util.async_ import create_eager_task
39 import homeassistant.util.dt as dt_util
40 
41 from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
42 from .helpers import entities_may_have_state_changes_after, has_states_before
43 
44 _LOGGER = logging.getLogger(__name__)
45 
46 
47 @dataclass(slots=True)
49  """Track a history live stream."""
50 
51  stream_queue: asyncio.Queue[Event]
52  subscriptions: list[CALLBACK_TYPE]
53  end_time_unsub: CALLBACK_TYPE | None = None
54  task: asyncio.Task | None = None
55  wait_sync_task: asyncio.Task | None = None
56 
57 
58 @callback
59 def async_setup(hass: HomeAssistant) -> None:
60  """Set up the history websocket API."""
61  websocket_api.async_register_command(hass, ws_get_history_during_period)
62  websocket_api.async_register_command(hass, ws_stream)
63 
64 
66  hass: HomeAssistant,
67  msg_id: int,
68  start_time: dt,
69  end_time: dt | None,
70  entity_ids: list[str] | None,
71  include_start_time_state: bool,
72  significant_changes_only: bool,
73  minimal_response: bool,
74  no_attributes: bool,
75 ) -> bytes:
76  """Fetch history significant_states and convert them to json in the executor."""
77  return json_bytes(
78  messages.result_message(
79  msg_id,
80  history.get_significant_states(
81  hass,
82  start_time,
83  end_time,
84  entity_ids,
85  None,
86  include_start_time_state,
87  significant_changes_only,
88  minimal_response,
89  no_attributes,
90  True,
91  ),
92  )
93  )
94 
95 
96 @websocket_api.websocket_command( { vol.Required("type"): "history/history_during_period",
97  vol.Required("start_time"): str,
98  vol.Optional("end_time"): str,
99  vol.Required("entity_ids"): [str],
100  vol.Optional("include_start_time_state", default=True): bool,
101  vol.Optional("significant_changes_only", default=True): bool,
102  vol.Optional("minimal_response", default=False): bool,
103  vol.Optional("no_attributes", default=False): bool,
104  }
105 )
106 @websocket_api.async_response
108  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
109 ) -> None:
110  """Handle history during period websocket command."""
111  start_time_str = msg["start_time"]
112  end_time_str = msg.get("end_time")
113 
114  if start_time := dt_util.parse_datetime(start_time_str):
115  start_time = dt_util.as_utc(start_time)
116  else:
117  connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
118  return
119 
120  if end_time_str:
121  if end_time := dt_util.parse_datetime(end_time_str):
122  end_time = dt_util.as_utc(end_time)
123  else:
124  connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
125  return
126  else:
127  end_time = None
128 
129  if start_time > dt_util.utcnow():
130  connection.send_result(msg["id"], {})
131  return
132 
133  entity_ids: list[str] = msg["entity_ids"]
134  for entity_id in entity_ids:
135  if not hass.states.get(entity_id) and not valid_entity_id(entity_id):
136  connection.send_error(msg["id"], "invalid_entity_ids", "Invalid entity_ids")
137  return
138 
139  include_start_time_state = msg["include_start_time_state"]
140  no_attributes = msg["no_attributes"]
141 
142  if (
143  # has_states_before will return True if there are states older than
144  # end_time. If it's false, we know there are no states in the
145  # database up until end_time.
146  (end_time and not has_states_before(hass, end_time))
147  or not include_start_time_state
148  and entity_ids
150  hass, entity_ids, start_time, no_attributes
151  )
152  ):
153  connection.send_result(msg["id"], {})
154  return
155 
156  significant_changes_only = msg["significant_changes_only"]
157  minimal_response = msg["minimal_response"]
158 
159  connection.send_message(
160  await get_instance(hass).async_add_executor_job(
161  _ws_get_significant_states,
162  hass,
163  msg["id"],
164  start_time,
165  end_time,
166  entity_ids,
167  include_start_time_state,
168  significant_changes_only,
169  minimal_response,
170  no_attributes,
171  )
172  )
173 
174 
176  states: dict[str, list[dict[str, Any]]],
177  start_day: dt,
178  end_day: dt,
179 ) -> dict[str, Any]:
180  """Generate a history stream message response."""
181  return {
182  "states": states,
183  "start_time": start_day.timestamp(),
184  "end_time": end_day.timestamp(),
185  }
186 
187 
188 @callback
190  connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None
191 ) -> None:
192  """Send an empty response when we know all results are filtered away."""
193  connection.send_result(msg_id)
194  stream_end_time = end_time or dt_util.utcnow()
195  connection.send_message(
196  _generate_websocket_response(msg_id, start_time, stream_end_time, {})
197  )
198 
199 
201  msg_id: int,
202  start_time: dt,
203  end_time: dt,
204  states: dict[str, list[dict[str, Any]]],
205 ) -> bytes:
206  """Generate a websocket response."""
207  return json_bytes(
208  messages.event_message(
209  msg_id, _generate_stream_message(states, start_time, end_time)
210  )
211  )
212 
213 
215  hass: HomeAssistant,
216  msg_id: int,
217  start_time: dt,
218  end_time: dt,
219  entity_ids: list[str] | None,
220  include_start_time_state: bool,
221  significant_changes_only: bool,
222  minimal_response: bool,
223  no_attributes: bool,
224  send_empty: bool,
225 ) -> tuple[float, dt | None, bytes | None]:
226  """Generate a historical response."""
227  states = cast(
228  dict[str, list[dict[str, Any]]],
229  history.get_significant_states(
230  hass,
231  start_time,
232  end_time,
233  entity_ids,
234  None,
235  include_start_time_state,
236  significant_changes_only,
237  minimal_response,
238  no_attributes,
239  True,
240  ),
241  )
242  last_time_ts = 0.0
243  for state_list in states.values():
244  if (
245  state_list
246  and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED])
247  > last_time_ts
248  ):
249  last_time_ts = cast(float, state_last_time)
250 
251  if last_time_ts == 0:
252  # If we did not send any states ever, we need to send an empty response
253  # so the websocket client knows it should render/process/consume the
254  # data.
255  if not send_empty:
256  return last_time_ts, None, None
257  last_time_dt = end_time
258  else:
259  last_time_dt = dt_util.utc_from_timestamp(last_time_ts)
260 
261  return (
262  last_time_ts,
263  last_time_dt,
264  _generate_websocket_response(msg_id, start_time, last_time_dt, states),
265  )
266 
267 
269  hass: HomeAssistant,
270  connection: ActiveConnection,
271  msg_id: int,
272  start_time: dt,
273  end_time: dt,
274  entity_ids: list[str] | None,
275  include_start_time_state: bool,
276  significant_changes_only: bool,
277  minimal_response: bool,
278  no_attributes: bool,
279  send_empty: bool,
280 ) -> dt | None:
281  """Fetch history significant_states and send them to the client."""
282  instance = get_instance(hass)
283  last_time_ts, last_time_dt, payload = await instance.async_add_executor_job(
284  _generate_historical_response,
285  hass,
286  msg_id,
287  start_time,
288  end_time,
289  entity_ids,
290  include_start_time_state,
291  significant_changes_only,
292  minimal_response,
293  no_attributes,
294  send_empty,
295  )
296  if payload:
297  connection.send_message(payload)
298  return last_time_dt if last_time_ts != 0 else None
299 
300 
301 def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]:
302  """Convert a state to a compressed state."""
303  comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state}
304  if not no_attributes or state.domain in history.NEED_ATTRIBUTE_DOMAINS:
305  comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes
306  comp_state[COMPRESSED_STATE_LAST_UPDATED] = state.last_updated_timestamp
307  if state.last_changed != state.last_updated:
308  comp_state[COMPRESSED_STATE_LAST_CHANGED] = state.last_changed_timestamp
309  return comp_state
310 
311 
313  events: Iterable[Event], no_attributes: bool
314 ) -> dict[str, list[dict[str, Any]]]:
315  """Convert events to a compressed states."""
316  states_by_entity_ids: dict[str, list[dict[str, Any]]] = {}
317  for event in events:
318  state: State = event.data["new_state"]
319  entity_id: str = state.entity_id
320  states_by_entity_ids.setdefault(entity_id, []).append(
321  _history_compressed_state(state, no_attributes)
322  )
323  return states_by_entity_ids
324 
325 
326 async def _async_events_consumer(
327  subscriptions_setup_complete_time: dt,
328  connection: ActiveConnection,
329  msg_id: int,
330  stream_queue: asyncio.Queue[Event],
331  no_attributes: bool,
332 ) -> None:
333  """Stream events from the queue."""
334  subscriptions_setup_complete_timestamp = (
335  subscriptions_setup_complete_time.timestamp()
336  )
337  while True:
338  events: list[Event] = [await stream_queue.get()]
339  # If the event is older than the last db
340  # event we already sent it so we skip it.
341  if events[0].time_fired_timestamp <= subscriptions_setup_complete_timestamp:
342  continue
343  # We sleep for the EVENT_COALESCE_TIME so
344  # we can group events together to minimize
345  # the number of websocket messages when the
346  # system is overloaded with an event storm
347  await asyncio.sleep(EVENT_COALESCE_TIME)
348  while not stream_queue.empty():
349  events.append(stream_queue.get_nowait())
350 
351  if history_states := _events_to_compressed_states(events, no_attributes):
352  connection.send_message(
353  json_bytes(
354  messages.event_message(
355  msg_id,
356  {"states": history_states},
357  )
358  )
359  )
360 
361 
362 @callback
364  hass: HomeAssistant,
365  subscriptions: list[CALLBACK_TYPE],
366  target: Callable[[Event[Any]], None],
367  entity_ids: list[str],
368  significant_changes_only: bool,
369  minimal_response: bool,
370 ) -> None:
371  """Subscribe to events for the entities and devices or all.
372 
373  These are the events we need to listen for to do
374  the live history stream.
375  """
376  assert is_callback(target), "target must be a callback"
377 
378  @callback
379  def _forward_state_events_filtered(event: Event[EventStateChangedData]) -> None:
380  """Filter state events and forward them."""
381  if (new_state := event.data["new_state"]) is None or (
382  old_state := event.data["old_state"]
383  ) is None:
384  return
385  if (
386  (significant_changes_only or minimal_response)
387  and new_state.state == old_state.state
388  and new_state.domain not in history.SIGNIFICANT_DOMAINS
389  ):
390  return
391  target(event)
392 
393  subscriptions.append(
394  async_track_state_change_event(hass, entity_ids, _forward_state_events_filtered)
395  )
396 
397 
398 @websocket_api.websocket_command( { vol.Required("type"): "history/stream",
399  vol.Required("start_time"): str,
400  vol.Optional("end_time"): str,
401  vol.Required("entity_ids"): [str],
402  vol.Optional("include_start_time_state", default=True): bool,
403  vol.Optional("significant_changes_only", default=True): bool,
404  vol.Optional("minimal_response", default=False): bool,
405  vol.Optional("no_attributes", default=False): bool,
406  }
407 )
408 @websocket_api.async_response
409 async def ws_stream(
410  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
411 ) -> None:
412  """Handle history stream websocket command."""
413  start_time_str = msg["start_time"]
414  msg_id: int = msg["id"]
415  utc_now = dt_util.utcnow()
416 
417  if start_time := dt_util.parse_datetime(start_time_str):
418  start_time = dt_util.as_utc(start_time)
419 
420  if not start_time or start_time > utc_now:
421  connection.send_error(msg_id, "invalid_start_time", "Invalid start_time")
422  return
423 
424  end_time_str = msg.get("end_time")
425  end_time: dt | None = None
426  if end_time_str:
427  if not (end_time := dt_util.parse_datetime(end_time_str)):
428  connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
429  return
430  end_time = dt_util.as_utc(end_time)
431  if end_time < start_time:
432  connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
433  return
434 
435  entity_ids: list[str] = msg["entity_ids"]
436  for entity_id in entity_ids:
437  if not hass.states.get(entity_id) and not valid_entity_id(entity_id):
438  connection.send_error(msg["id"], "invalid_entity_ids", "Invalid entity_ids")
439  return
440 
441  include_start_time_state = msg["include_start_time_state"]
442  significant_changes_only = msg["significant_changes_only"]
443  no_attributes = msg["no_attributes"]
444  minimal_response = msg["minimal_response"]
445 
446  if end_time and end_time <= utc_now:
447  if (
448  not include_start_time_state
449  and entity_ids
451  hass, entity_ids, start_time, no_attributes
452  )
453  ):
454  _async_send_empty_response(connection, msg_id, start_time, end_time)
455  return
456 
457  connection.subscriptions[msg_id] = callback(lambda: None)
458  connection.send_result(msg_id)
460  hass,
461  connection,
462  msg_id,
463  start_time,
464  end_time,
465  entity_ids,
466  include_start_time_state,
467  significant_changes_only,
468  minimal_response,
469  no_attributes,
470  True,
471  )
472  return
473 
474  subscriptions: list[CALLBACK_TYPE] = []
475  stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES)
476  live_stream = HistoryLiveStream(
477  subscriptions=subscriptions, stream_queue=stream_queue
478  )
479 
480  @callback
481  def _unsub(*_utc_time: Any) -> None:
482  """Unsubscribe from all events."""
483  for subscription in subscriptions:
484  subscription()
485  subscriptions.clear()
486  if live_stream.task:
487  live_stream.task.cancel()
488  if live_stream.wait_sync_task:
489  live_stream.wait_sync_task.cancel()
490  if live_stream.end_time_unsub:
491  live_stream.end_time_unsub()
492  live_stream.end_time_unsub = None
493 
494  if end_time:
495  live_stream.end_time_unsub = async_track_point_in_utc_time(
496  hass, _unsub, end_time
497  )
498 
499  @callback
500  def _queue_or_cancel(event: Event) -> None:
501  """Queue an event to be processed or cancel."""
502  try:
503  stream_queue.put_nowait(event)
504  except asyncio.QueueFull:
505  _LOGGER.debug(
506  "Client exceeded max pending messages of %s",
507  MAX_PENDING_HISTORY_STATES,
508  )
509  _unsub()
510 
512  hass,
513  subscriptions,
514  _queue_or_cancel,
515  entity_ids,
516  significant_changes_only=significant_changes_only,
517  minimal_response=minimal_response,
518  )
519  subscriptions_setup_complete_time = dt_util.utcnow()
520  connection.subscriptions[msg_id] = _unsub
521  connection.send_result(msg_id)
522  # Fetch everything from history
523  last_event_time = await _async_send_historical_states(
524  hass,
525  connection,
526  msg_id,
527  start_time,
528  subscriptions_setup_complete_time,
529  entity_ids,
530  include_start_time_state,
531  significant_changes_only,
532  minimal_response,
533  no_attributes,
534  True,
535  )
536 
537  if msg_id not in connection.subscriptions:
538  # Unsubscribe happened while sending historical states
539  return
540 
541  live_stream.task = create_eager_task(
543  subscriptions_setup_complete_time,
544  connection,
545  msg_id,
546  stream_queue,
547  no_attributes,
548  )
549  )
550 
551  live_stream.wait_sync_task = create_eager_task(
552  get_instance(hass).async_block_till_done()
553  )
554  await live_stream.wait_sync_task
555 
556  #
557  # Fetch any states from the database that have
558  # not been committed since the original fetch
559  # so we can switch over to using the subscriptions
560  #
561  # We only want states that happened after the last state
562  # we had from the last database query
563  #
565  hass,
566  connection,
567  msg_id,
568  # Add one microsecond so we are outside the window of
569  # the last event we got from the database since otherwise
570  # we could fetch the same event twice
571  (last_event_time or start_time) + timedelta(microseconds=1),
572  subscriptions_setup_complete_time,
573  entity_ids,
574  False, # We don't want the start time state again
575  significant_changes_only,
576  minimal_response,
577  no_attributes,
578  send_empty=not last_event_time,
579  )
580 
bool has_states_before(HomeAssistant hass, dt run_time)
Definition: helpers.py:28
bool entities_may_have_state_changes_after(HomeAssistant hass, Iterable entity_ids, dt start_time, bool no_attributes)
Definition: helpers.py:14
None _async_send_empty_response(ActiveConnection connection, int msg_id, dt start_time, dt|None end_time)
None ws_stream(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None _async_subscribe_events(HomeAssistant hass, list[CALLBACK_TYPE] subscriptions, Callable[[Event[Any]], None] target, list[str] entity_ids, bool significant_changes_only, bool minimal_response)
bytes _ws_get_significant_states(HomeAssistant hass, int msg_id, dt start_time, dt|None end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes)
dict[str, Any] _history_compressed_state(State state, bool no_attributes)
dict[str, list[dict[str, Any]]] _events_to_compressed_states(Iterable[Event] events, bool no_attributes)
bytes _generate_websocket_response(int msg_id, dt start_time, dt end_time, dict[str, list[dict[str, Any]]] states)
dict[str, Any] _generate_stream_message(dict[str, list[dict[str, Any]]] states, dt start_day, dt end_day)
tuple[float, dt|None, bytes|None] _generate_historical_response(HomeAssistant hass, int msg_id, dt start_time, dt end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes, bool send_empty)
dt|None _async_send_historical_states(HomeAssistant hass, ActiveConnection connection, int msg_id, dt start_time, dt end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes, bool send_empty)
None _async_events_consumer(dt subscriptions_setup_complete_time, ActiveConnection connection, int msg_id, asyncio.Queue[Event] stream_queue, bool no_attributes)
None ws_get_history_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool valid_entity_id(str entity_id)
Definition: core.py:235
bool is_callback(Callable[..., Any] func)
Definition: core.py:259
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:314
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Definition: event.py:1542
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74