1 """Message templates for websocket commands."""
3 from __future__
import annotations
5 from functools
import lru_cache
7 from typing
import Any, Final
9 import voluptuous
as vol
12 COMPRESSED_STATE_ATTRIBUTES,
13 COMPRESSED_STATE_CONTEXT,
14 COMPRESSED_STATE_LAST_CHANGED,
15 COMPRESSED_STATE_LAST_UPDATED,
16 COMPRESSED_STATE_STATE,
22 find_paths_unserializable_data,
29 _LOGGER: Final = logging.getLogger(__name__)
32 MINIMAL_MESSAGE_SCHEMA: Final = vol.Schema(
33 {vol.Required(
"id"): cv.positive_int, vol.Required(
"type"): cv.string},
34 extra=vol.ALLOW_EXTRA,
38 BASE_COMMAND_MESSAGE_SCHEMA: Final = vol.Schema({vol.Required(
"id"): cv.positive_int})
40 STATE_DIFF_ADDITIONS =
"+"
41 STATE_DIFF_REMOVALS =
"-"
43 ENTITY_EVENT_ADD =
"a"
44 ENTITY_EVENT_REMOVE =
"r"
45 ENTITY_EVENT_CHANGE =
"c"
47 BASE_ERROR_MESSAGE = {
48 "type": const.TYPE_RESULT,
56 "code": const.ERR_UNKNOWN_ERROR,
57 "message":
"Invalid JSON in response",
64 """Return a success result message."""
65 return {
"id": iden,
"type": const.TYPE_RESULT,
"success":
True,
"result": result}
69 """Construct a success result message JSON."""
74 b
',"type":"result","success":true,"result":',
85 translation_key: str |
None =
None,
86 translation_domain: str |
None =
None,
87 translation_placeholders: dict[str, Any] |
None =
None,
89 """Return an error result message."""
90 error_payload: dict[str, Any] = {
96 if translation_key
is not None:
97 error_payload[
"translation_key"] = translation_key
98 error_payload[
"translation_placeholders"] = translation_placeholders
99 error_payload[
"translation_domain"] = translation_domain
102 **BASE_ERROR_MESSAGE,
103 "error": error_payload,
108 """Return an event message."""
109 return {
"id": iden,
"type":
"event",
"event": event}
113 """Return an event message.
115 Serialize to json once per message.
117 Since we can have many clients connected that are
118 all getting many of the same events (mostly state changed)
119 we can avoid serializing the same data for each connection.
131 @lru_cache(maxsize=128)
133 """Cache and serialize the event to json.
135 The message is constructed without the id which appended
136 in cached_event_message.
140 or INVALID_JSON_PARTIAL_MESSAGE
145 message_id_as_bytes: bytes, event: Event[EventStateChangedData]
147 """Return an event message.
149 Serialize to json once per message.
151 Since we can have many clients connected that are
152 all getting many of the same events (mostly state changed)
153 we can avoid serializing the same data for each connection.
165 @lru_cache(maxsize=128)
167 """Cache and serialize the event to json.
169 The message is constructed without the id which
170 will be appended in cached_state_diff_message
176 or INVALID_JSON_PARTIAL_MESSAGE
181 event: Event[EventStateChangedData],
185 | dict[str, CompressedState]
186 | dict[str, dict[str, dict[str, str | list[str]]]],
188 """Convert a state_changed event to the minimal version.
193 "a": {entity_id: compressed_state,…}
194 "c": {entity_id: diff,…}
198 if (new_state := event.data[
"new_state"])
is None:
199 return {ENTITY_EVENT_REMOVE: [event.data[
"entity_id"]]}
200 if (old_state := event.data[
"old_state"])
is None:
201 return {ENTITY_EVENT_ADD: {new_state.entity_id: new_state.as_compressed_state}}
202 additions: dict[str, Any] = {}
203 diff: dict[str, dict[str, Any]] = {STATE_DIFF_ADDITIONS: additions}
204 new_state_context = new_state.context
205 old_state_context = old_state.context
206 if old_state.state != new_state.state:
207 additions[COMPRESSED_STATE_STATE] = new_state.state
208 if old_state.last_changed != new_state.last_changed:
209 additions[COMPRESSED_STATE_LAST_CHANGED] = new_state.last_changed_timestamp
210 elif old_state.last_updated != new_state.last_updated:
211 additions[COMPRESSED_STATE_LAST_UPDATED] = new_state.last_updated_timestamp
212 if old_state_context.parent_id != new_state_context.parent_id:
213 additions[COMPRESSED_STATE_CONTEXT] = {
"parent_id": new_state_context.parent_id}
214 if old_state_context.user_id != new_state_context.user_id:
215 if COMPRESSED_STATE_CONTEXT
in additions:
216 additions[COMPRESSED_STATE_CONTEXT][
"user_id"] = new_state_context.user_id
218 additions[COMPRESSED_STATE_CONTEXT] = {
"user_id": new_state_context.user_id}
219 if old_state_context.id != new_state_context.id:
220 if COMPRESSED_STATE_CONTEXT
in additions:
221 additions[COMPRESSED_STATE_CONTEXT][
"id"] = new_state_context.id
223 additions[COMPRESSED_STATE_CONTEXT] = new_state_context.id
224 if (old_attributes := old_state.attributes) != (
225 new_attributes := new_state.attributes
229 for key, value
in new_attributes.items()
230 if key
not in old_attributes
or old_attributes[key] != value
232 additions[COMPRESSED_STATE_ATTRIBUTES] = added
233 if removed := old_attributes.keys() - new_attributes:
237 diff[STATE_DIFF_REMOVALS] = {COMPRESSED_STATE_ATTRIBUTES:
list(removed)}
238 return {ENTITY_EVENT_CHANGE: {new_state.entity_id: diff}}
242 """Serialize a websocket message to json or return None."""
245 except (ValueError, TypeError):
247 "Unable to serialize to JSON. Bad data found at %s",
256 """Serialize a websocket message to json or return an error."""
259 message[
"id"], const.ERR_UNKNOWN_ERROR,
"Invalid JSON in response"
bytes construct_result_message(int iden, bytes payload)
bytes cached_event_message(bytes message_id_as_bytes, Event event)
dict[str, Any] result_message(int iden, Any result=None)
dict[str, Any] error_message(int|None iden, str code, str message, str|None translation_key=None, str|None translation_domain=None, dict[str, Any]|None translation_placeholders=None)
bytes message_to_json_bytes(dict[str, Any] message)
dict[str, Any] event_message(int iden, Any event)
bytes cached_state_diff_message(bytes message_id_as_bytes, Event[EventStateChangedData] event)
bytes _partial_cached_state_diff_message(Event[EventStateChangedData] event)
bytes|None _message_to_json_bytes_or_none(dict[str, Any] message)
dict[ str, list[str]|dict[str, CompressedState]|dict[str, dict[str, dict[str, str|list[str]]]],] _state_diff_event(Event[EventStateChangedData] event)
bytes _partial_cached_event_message(Event event)
dict[str, Any] find_paths_unserializable_data(Any bad_data, *Callable[[Any], str] dump=json.dumps)
str format_unserializable_data(dict[str, Any] data)