Home Assistant Unofficial Reference 2024.12.1
messages.py
Go to the documentation of this file.
1 """Message templates for websocket commands."""
2 
3 from __future__ import annotations
4 
5 from functools import lru_cache
6 import logging
7 from typing import Any, Final
8 
9 import voluptuous as vol
10 
11 from homeassistant.const import (
12  COMPRESSED_STATE_ATTRIBUTES,
13  COMPRESSED_STATE_CONTEXT,
14  COMPRESSED_STATE_LAST_CHANGED,
15  COMPRESSED_STATE_LAST_UPDATED,
16  COMPRESSED_STATE_STATE,
17 )
18 from homeassistant.core import CompressedState, Event, EventStateChangedData
19 from homeassistant.helpers import config_validation as cv
20 from homeassistant.helpers.json import (
21  JSON_DUMP,
22  find_paths_unserializable_data,
23  json_bytes,
24 )
25 from homeassistant.util.json import format_unserializable_data
26 
27 from . import const
28 
29 _LOGGER: Final = logging.getLogger(__name__)
30 
31 # Minimal requirements of a message
32 MINIMAL_MESSAGE_SCHEMA: Final = vol.Schema(
33  {vol.Required("id"): cv.positive_int, vol.Required("type"): cv.string},
34  extra=vol.ALLOW_EXTRA,
35 )
36 
37 # Base schema to extend by message handlers
38 BASE_COMMAND_MESSAGE_SCHEMA: Final = vol.Schema({vol.Required("id"): cv.positive_int})
39 
40 STATE_DIFF_ADDITIONS = "+"
41 STATE_DIFF_REMOVALS = "-"
42 
43 ENTITY_EVENT_ADD = "a"
44 ENTITY_EVENT_REMOVE = "r"
45 ENTITY_EVENT_CHANGE = "c"
46 
47 BASE_ERROR_MESSAGE = {
48  "type": const.TYPE_RESULT,
49  "success": False,
50 }
51 
52 INVALID_JSON_PARTIAL_MESSAGE = json_bytes(
53  {
54  **BASE_ERROR_MESSAGE,
55  "error": {
56  "code": const.ERR_UNKNOWN_ERROR,
57  "message": "Invalid JSON in response",
58  },
59  }
60 )
61 
62 
63 def result_message(iden: int, result: Any = None) -> dict[str, Any]:
64  """Return a success result message."""
65  return {"id": iden, "type": const.TYPE_RESULT, "success": True, "result": result}
66 
67 
68 def construct_result_message(iden: int, payload: bytes) -> bytes:
69  """Construct a success result message JSON."""
70  return b"".join(
71  (
72  b'{"id":',
73  str(iden).encode(),
74  b',"type":"result","success":true,"result":',
75  payload,
76  b"}",
77  )
78  )
79 
80 
82  iden: int | None,
83  code: str,
84  message: str,
85  translation_key: str | None = None,
86  translation_domain: str | None = None,
87  translation_placeholders: dict[str, Any] | None = None,
88 ) -> dict[str, Any]:
89  """Return an error result message."""
90  error_payload: dict[str, Any] = {
91  "code": code,
92  "message": message,
93  }
94  # In case `translation_key` is `None` we do not set it, nor the
95  # `translation`_placeholders` and `translation_domain`.
96  if translation_key is not None:
97  error_payload["translation_key"] = translation_key
98  error_payload["translation_placeholders"] = translation_placeholders
99  error_payload["translation_domain"] = translation_domain
100  return {
101  "id": iden,
102  **BASE_ERROR_MESSAGE,
103  "error": error_payload,
104  }
105 
106 
107 def event_message(iden: int, event: Any) -> dict[str, Any]:
108  """Return an event message."""
109  return {"id": iden, "type": "event", "event": event}
110 
111 
112 def cached_event_message(message_id_as_bytes: bytes, event: Event) -> bytes:
113  """Return an event message.
114 
115  Serialize to json once per message.
116 
117  Since we can have many clients connected that are
118  all getting many of the same events (mostly state changed)
119  we can avoid serializing the same data for each connection.
120  """
121  return b"".join(
122  (
123  _partial_cached_event_message(event)[:-1],
124  b',"id":',
125  message_id_as_bytes,
126  b"}",
127  )
128  )
129 
130 
131 @lru_cache(maxsize=128)
132 def _partial_cached_event_message(event: Event) -> bytes:
133  """Cache and serialize the event to json.
134 
135  The message is constructed without the id which appended
136  in cached_event_message.
137  """
138  return (
139  _message_to_json_bytes_or_none({"type": "event", "event": event.json_fragment})
140  or INVALID_JSON_PARTIAL_MESSAGE
141  )
142 
143 
145  message_id_as_bytes: bytes, event: Event[EventStateChangedData]
146 ) -> bytes:
147  """Return an event message.
148 
149  Serialize to json once per message.
150 
151  Since we can have many clients connected that are
152  all getting many of the same events (mostly state changed)
153  we can avoid serializing the same data for each connection.
154  """
155  return b"".join(
156  (
158  b',"id":',
159  message_id_as_bytes,
160  b"}",
161  )
162  )
163 
164 
165 @lru_cache(maxsize=128)
166 def _partial_cached_state_diff_message(event: Event[EventStateChangedData]) -> bytes:
167  """Cache and serialize the event to json.
168 
169  The message is constructed without the id which
170  will be appended in cached_state_diff_message
171  """
172  return (
174  {"type": "event", "event": _state_diff_event(event)}
175  )
176  or INVALID_JSON_PARTIAL_MESSAGE
177  )
178 
179 
181  event: Event[EventStateChangedData],
182 ) -> dict[
183  str,
184  list[str]
185  | dict[str, CompressedState]
186  | dict[str, dict[str, dict[str, str | list[str]]]],
187 ]:
188  """Convert a state_changed event to the minimal version.
189 
190  State update example
191 
192  {
193  "a": {entity_id: compressed_state,…}
194  "c": {entity_id: diff,…}
195  "r": [entity_id,…]
196  }
197  """
198  if (new_state := event.data["new_state"]) is None:
199  return {ENTITY_EVENT_REMOVE: [event.data["entity_id"]]}
200  if (old_state := event.data["old_state"]) is None:
201  return {ENTITY_EVENT_ADD: {new_state.entity_id: new_state.as_compressed_state}}
202  additions: dict[str, Any] = {}
203  diff: dict[str, dict[str, Any]] = {STATE_DIFF_ADDITIONS: additions}
204  new_state_context = new_state.context
205  old_state_context = old_state.context
206  if old_state.state != new_state.state:
207  additions[COMPRESSED_STATE_STATE] = new_state.state
208  if old_state.last_changed != new_state.last_changed:
209  additions[COMPRESSED_STATE_LAST_CHANGED] = new_state.last_changed_timestamp
210  elif old_state.last_updated != new_state.last_updated:
211  additions[COMPRESSED_STATE_LAST_UPDATED] = new_state.last_updated_timestamp
212  if old_state_context.parent_id != new_state_context.parent_id:
213  additions[COMPRESSED_STATE_CONTEXT] = {"parent_id": new_state_context.parent_id}
214  if old_state_context.user_id != new_state_context.user_id:
215  if COMPRESSED_STATE_CONTEXT in additions:
216  additions[COMPRESSED_STATE_CONTEXT]["user_id"] = new_state_context.user_id
217  else:
218  additions[COMPRESSED_STATE_CONTEXT] = {"user_id": new_state_context.user_id}
219  if old_state_context.id != new_state_context.id:
220  if COMPRESSED_STATE_CONTEXT in additions:
221  additions[COMPRESSED_STATE_CONTEXT]["id"] = new_state_context.id
222  else:
223  additions[COMPRESSED_STATE_CONTEXT] = new_state_context.id
224  if (old_attributes := old_state.attributes) != (
225  new_attributes := new_state.attributes
226  ):
227  if added := {
228  key: value
229  for key, value in new_attributes.items()
230  if key not in old_attributes or old_attributes[key] != value
231  }:
232  additions[COMPRESSED_STATE_ATTRIBUTES] = added
233  if removed := old_attributes.keys() - new_attributes:
234  # sets are not JSON serializable by default so we convert to list
235  # here if there are any values to avoid jumping into the json_encoder_default
236  # for every state diff with a removed attribute
237  diff[STATE_DIFF_REMOVALS] = {COMPRESSED_STATE_ATTRIBUTES: list(removed)}
238  return {ENTITY_EVENT_CHANGE: {new_state.entity_id: diff}}
239 
240 
241 def _message_to_json_bytes_or_none(message: dict[str, Any]) -> bytes | None:
242  """Serialize a websocket message to json or return None."""
243  try:
244  return json_bytes(message)
245  except (ValueError, TypeError):
246  _LOGGER.error(
247  "Unable to serialize to JSON. Bad data found at %s",
249  find_paths_unserializable_data(message, dump=JSON_DUMP)
250  ),
251  )
252  return None
253 
254 
255 def message_to_json_bytes(message: dict[str, Any]) -> bytes:
256  """Serialize a websocket message to json or return an error."""
257  return _message_to_json_bytes_or_none(message) or json_bytes(
259  message["id"], const.ERR_UNKNOWN_ERROR, "Invalid JSON in response"
260  )
261  )
bytes construct_result_message(int iden, bytes payload)
Definition: messages.py:68
bytes cached_event_message(bytes message_id_as_bytes, Event event)
Definition: messages.py:112
dict[str, Any] result_message(int iden, Any result=None)
Definition: messages.py:63
dict[str, Any] error_message(int|None iden, str code, str message, str|None translation_key=None, str|None translation_domain=None, dict[str, Any]|None translation_placeholders=None)
Definition: messages.py:88
bytes message_to_json_bytes(dict[str, Any] message)
Definition: messages.py:255
dict[str, Any] event_message(int iden, Any event)
Definition: messages.py:107
bytes cached_state_diff_message(bytes message_id_as_bytes, Event[EventStateChangedData] event)
Definition: messages.py:146
bytes _partial_cached_state_diff_message(Event[EventStateChangedData] event)
Definition: messages.py:166
bytes|None _message_to_json_bytes_or_none(dict[str, Any] message)
Definition: messages.py:241
dict[ str, list[str]|dict[str, CompressedState]|dict[str, dict[str, dict[str, str|list[str]]]],] _state_diff_event(Event[EventStateChangedData] event)
Definition: messages.py:187
dict[str, Any] find_paths_unserializable_data(Any bad_data, *Callable[[Any], str] dump=json.dumps)
Definition: json.py:233
str format_unserializable_data(dict[str, Any] data)
Definition: json.py:126