1 """Websocket API for the history integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Iterable
7 from dataclasses
import dataclass
8 from datetime
import datetime
as dt, timedelta
10 from typing
import Any, cast
12 import voluptuous
as vol
18 COMPRESSED_STATE_ATTRIBUTES,
19 COMPRESSED_STATE_LAST_CHANGED,
20 COMPRESSED_STATE_LAST_UPDATED,
21 COMPRESSED_STATE_STATE,
26 EventStateChangedData,
34 async_track_point_in_utc_time,
35 async_track_state_change_event,
41 from .const
import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
42 from .helpers
import entities_may_have_state_changes_after, has_states_before
44 _LOGGER = logging.getLogger(__name__)
47 @dataclass(slots=True)
49 """Track a history live stream."""
51 stream_queue: asyncio.Queue[Event]
52 subscriptions: list[CALLBACK_TYPE]
53 end_time_unsub: CALLBACK_TYPE |
None =
None
54 task: asyncio.Task |
None =
None
55 wait_sync_task: asyncio.Task |
None =
None
60 """Set up the history websocket API."""
61 websocket_api.async_register_command(hass, ws_get_history_during_period)
62 websocket_api.async_register_command(hass, ws_stream)
70 entity_ids: list[str] |
None,
71 include_start_time_state: bool,
72 significant_changes_only: bool,
73 minimal_response: bool,
76 """Fetch history significant_states and convert them to json in the executor."""
78 messages.result_message(
80 history.get_significant_states(
86 include_start_time_state,
87 significant_changes_only,
96 @websocket_api.websocket_command(
{
vol.Required("type"):
"history/history_during_period",
97 vol.Required(
"start_time"): str,
98 vol.Optional(
"end_time"): str,
99 vol.Required(
"entity_ids"): [str],
100 vol.Optional(
"include_start_time_state", default=
True): bool,
101 vol.Optional(
"significant_changes_only", default=
True): bool,
102 vol.Optional(
"minimal_response", default=
False): bool,
103 vol.Optional(
"no_attributes", default=
False): bool,
106 @websocket_api.async_response
108 hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
110 """Handle history during period websocket command."""
111 start_time_str = msg[
"start_time"]
112 end_time_str = msg.get(
"end_time")
114 if start_time := dt_util.parse_datetime(start_time_str):
115 start_time = dt_util.as_utc(start_time)
117 connection.send_error(msg[
"id"],
"invalid_start_time",
"Invalid start_time")
121 if end_time := dt_util.parse_datetime(end_time_str):
122 end_time = dt_util.as_utc(end_time)
124 connection.send_error(msg[
"id"],
"invalid_end_time",
"Invalid end_time")
129 if start_time > dt_util.utcnow():
130 connection.send_result(msg[
"id"], {})
133 entity_ids: list[str] = msg[
"entity_ids"]
134 for entity_id
in entity_ids:
136 connection.send_error(msg[
"id"],
"invalid_entity_ids",
"Invalid entity_ids")
139 include_start_time_state = msg[
"include_start_time_state"]
140 no_attributes = msg[
"no_attributes"]
147 or not include_start_time_state
150 hass, entity_ids, start_time, no_attributes
153 connection.send_result(msg[
"id"], {})
156 significant_changes_only = msg[
"significant_changes_only"]
157 minimal_response = msg[
"minimal_response"]
159 connection.send_message(
161 _ws_get_significant_states,
167 include_start_time_state,
168 significant_changes_only,
176 states: dict[str, list[dict[str, Any]]],
180 """Generate a history stream message response."""
183 "start_time": start_day.timestamp(),
184 "end_time": end_day.timestamp(),
190 connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt |
None
192 """Send an empty response when we know all results are filtered away."""
193 connection.send_result(msg_id)
194 stream_end_time = end_time
or dt_util.utcnow()
195 connection.send_message(
204 states: dict[str, list[dict[str, Any]]],
206 """Generate a websocket response."""
208 messages.event_message(
219 entity_ids: list[str] |
None,
220 include_start_time_state: bool,
221 significant_changes_only: bool,
222 minimal_response: bool,
225 ) -> tuple[float, dt |
None, bytes |
None]:
226 """Generate a historical response."""
228 dict[str, list[dict[str, Any]]],
229 history.get_significant_states(
235 include_start_time_state,
236 significant_changes_only,
243 for state_list
in states.values():
246 and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED])
249 last_time_ts = cast(float, state_last_time)
251 if last_time_ts == 0:
256 return last_time_ts,
None,
None
257 last_time_dt = end_time
259 last_time_dt = dt_util.utc_from_timestamp(last_time_ts)
270 connection: ActiveConnection,
274 entity_ids: list[str] |
None,
275 include_start_time_state: bool,
276 significant_changes_only: bool,
277 minimal_response: bool,
281 """Fetch history significant_states and send them to the client."""
283 last_time_ts, last_time_dt, payload = await instance.async_add_executor_job(
284 _generate_historical_response,
290 include_start_time_state,
291 significant_changes_only,
297 connection.send_message(payload)
298 return last_time_dt
if last_time_ts != 0
else None
302 """Convert a state to a compressed state."""
303 comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state}
304 if not no_attributes
or state.domain
in history.NEED_ATTRIBUTE_DOMAINS:
305 comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes
306 comp_state[COMPRESSED_STATE_LAST_UPDATED] = state.last_updated_timestamp
307 if state.last_changed != state.last_updated:
308 comp_state[COMPRESSED_STATE_LAST_CHANGED] = state.last_changed_timestamp
313 events: Iterable[Event], no_attributes: bool
314 ) -> dict[str, list[dict[str, Any]]]:
315 """Convert events to a compressed states."""
316 states_by_entity_ids: dict[str, list[dict[str, Any]]] = {}
318 state: State = event.data[
"new_state"]
319 entity_id: str = state.entity_id
320 states_by_entity_ids.setdefault(entity_id, []).append(
323 return states_by_entity_ids
327 subscriptions_setup_complete_time: dt,
328 connection: ActiveConnection,
330 stream_queue: asyncio.Queue[Event],
333 """Stream events from the queue."""
334 subscriptions_setup_complete_timestamp = (
335 subscriptions_setup_complete_time.timestamp()
338 events: list[Event] = [await stream_queue.get()]
341 if events[0].time_fired_timestamp <= subscriptions_setup_complete_timestamp:
347 await asyncio.sleep(EVENT_COALESCE_TIME)
348 while not stream_queue.empty():
349 events.append(stream_queue.get_nowait())
352 connection.send_message(
354 messages.event_message(
356 {
"states": history_states},
365 subscriptions: list[CALLBACK_TYPE],
366 target: Callable[[Event[Any]],
None],
367 entity_ids: list[str],
368 significant_changes_only: bool,
369 minimal_response: bool,
371 """Subscribe to events for the entities and devices or all.
373 These are the events we need to listen for to do
374 the live history stream.
376 assert is_callback(target),
"target must be a callback"
379 def _forward_state_events_filtered(event: Event[EventStateChangedData]) ->
None:
380 """Filter state events and forward them."""
381 if (new_state := event.data[
"new_state"])
is None or (
382 old_state := event.data[
"old_state"]
386 (significant_changes_only
or minimal_response)
387 and new_state.state == old_state.state
388 and new_state.domain
not in history.SIGNIFICANT_DOMAINS
393 subscriptions.append(
398 @websocket_api.websocket_command(
{
vol.Required("type"):
"history/stream",
399 vol.Required(
"start_time"): str,
400 vol.Optional(
"end_time"): str,
401 vol.Required(
"entity_ids"): [str],
402 vol.Optional(
"include_start_time_state", default=
True): bool,
403 vol.Optional(
"significant_changes_only", default=
True): bool,
404 vol.Optional(
"minimal_response", default=
False): bool,
405 vol.Optional(
"no_attributes", default=
False): bool,
408 @websocket_api.async_response
410 hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
412 """Handle history stream websocket command."""
413 start_time_str = msg[
"start_time"]
414 msg_id: int = msg[
"id"]
415 utc_now = dt_util.utcnow()
417 if start_time := dt_util.parse_datetime(start_time_str):
418 start_time = dt_util.as_utc(start_time)
420 if not start_time
or start_time > utc_now:
421 connection.send_error(msg_id,
"invalid_start_time",
"Invalid start_time")
424 end_time_str = msg.get(
"end_time")
425 end_time: dt |
None =
None
427 if not (end_time := dt_util.parse_datetime(end_time_str)):
428 connection.send_error(msg_id,
"invalid_end_time",
"Invalid end_time")
430 end_time = dt_util.as_utc(end_time)
431 if end_time < start_time:
432 connection.send_error(msg_id,
"invalid_end_time",
"Invalid end_time")
435 entity_ids: list[str] = msg[
"entity_ids"]
436 for entity_id
in entity_ids:
438 connection.send_error(msg[
"id"],
"invalid_entity_ids",
"Invalid entity_ids")
441 include_start_time_state = msg[
"include_start_time_state"]
442 significant_changes_only = msg[
"significant_changes_only"]
443 no_attributes = msg[
"no_attributes"]
444 minimal_response = msg[
"minimal_response"]
446 if end_time
and end_time <= utc_now:
448 not include_start_time_state
451 hass, entity_ids, start_time, no_attributes
457 connection.subscriptions[msg_id] = callback(
lambda:
None)
458 connection.send_result(msg_id)
466 include_start_time_state,
467 significant_changes_only,
474 subscriptions: list[CALLBACK_TYPE] = []
475 stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES)
477 subscriptions=subscriptions, stream_queue=stream_queue
481 def _unsub(*_utc_time: Any) ->
None:
482 """Unsubscribe from all events."""
483 for subscription
in subscriptions:
485 subscriptions.clear()
487 live_stream.task.cancel()
488 if live_stream.wait_sync_task:
489 live_stream.wait_sync_task.cancel()
490 if live_stream.end_time_unsub:
491 live_stream.end_time_unsub()
492 live_stream.end_time_unsub =
None
496 hass, _unsub, end_time
500 def _queue_or_cancel(event: Event) ->
None:
501 """Queue an event to be processed or cancel."""
503 stream_queue.put_nowait(event)
504 except asyncio.QueueFull:
506 "Client exceeded max pending messages of %s",
507 MAX_PENDING_HISTORY_STATES,
516 significant_changes_only=significant_changes_only,
517 minimal_response=minimal_response,
519 subscriptions_setup_complete_time = dt_util.utcnow()
520 connection.subscriptions[msg_id] = _unsub
521 connection.send_result(msg_id)
528 subscriptions_setup_complete_time,
530 include_start_time_state,
531 significant_changes_only,
537 if msg_id
not in connection.subscriptions:
541 live_stream.task = create_eager_task(
543 subscriptions_setup_complete_time,
551 live_stream.wait_sync_task = create_eager_task(
554 await live_stream.wait_sync_task
571 (last_event_time
or start_time) +
timedelta(microseconds=1),
572 subscriptions_setup_complete_time,
575 significant_changes_only,
578 send_empty=
not last_event_time,
580
bool has_states_before(HomeAssistant hass, dt run_time)
bool entities_may_have_state_changes_after(HomeAssistant hass, Iterable entity_ids, dt start_time, bool no_attributes)
None _async_send_empty_response(ActiveConnection connection, int msg_id, dt start_time, dt|None end_time)
None ws_stream(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None _async_subscribe_events(HomeAssistant hass, list[CALLBACK_TYPE] subscriptions, Callable[[Event[Any]], None] target, list[str] entity_ids, bool significant_changes_only, bool minimal_response)
None async_setup(HomeAssistant hass)
bytes _ws_get_significant_states(HomeAssistant hass, int msg_id, dt start_time, dt|None end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes)
dict[str, Any] _history_compressed_state(State state, bool no_attributes)
dict[str, list[dict[str, Any]]] _events_to_compressed_states(Iterable[Event] events, bool no_attributes)
bytes _generate_websocket_response(int msg_id, dt start_time, dt end_time, dict[str, list[dict[str, Any]]] states)
dict[str, Any] _generate_stream_message(dict[str, list[dict[str, Any]]] states, dt start_day, dt end_day)
tuple[float, dt|None, bytes|None] _generate_historical_response(HomeAssistant hass, int msg_id, dt start_time, dt end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes, bool send_empty)
dt|None _async_send_historical_states(HomeAssistant hass, ActiveConnection connection, int msg_id, dt start_time, dt end_time, list[str]|None entity_ids, bool include_start_time_state, bool significant_changes_only, bool minimal_response, bool no_attributes, bool send_empty)
None _async_events_consumer(dt subscriptions_setup_complete_time, ActiveConnection connection, int msg_id, asyncio.Queue[Event] stream_queue, bool no_attributes)
None ws_get_history_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool valid_entity_id(str entity_id)
bool is_callback(Callable[..., Any] func)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Recorder get_instance(HomeAssistant hass)