Home Assistant Unofficial Reference 2024.12.1
models.py
Go to the documentation of this file.
1 """Models used by multiple MQTT modules."""
2 
3 from __future__ import annotations
4 
5 from ast import literal_eval
6 import asyncio
7 from collections import deque
8 from collections.abc import Callable
9 from dataclasses import dataclass, field
10 from enum import StrEnum
11 import logging
12 from typing import TYPE_CHECKING, Any, TypedDict
13 
14 from homeassistant.const import ATTR_ENTITY_ID, ATTR_NAME, Platform
15 from homeassistant.core import CALLBACK_TYPE, callback
16 from homeassistant.exceptions import ServiceValidationError, TemplateError
17 from homeassistant.helpers import template
18 from homeassistant.helpers.entity import Entity
19 from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
20 from homeassistant.helpers.typing import (
21  ConfigType,
22  DiscoveryInfoType,
23  TemplateVarsType,
24  VolSchemaType,
25 )
26 from homeassistant.util.hass_dict import HassKey
27 
28 if TYPE_CHECKING:
29  from paho.mqtt.client import MQTTMessage
30 
31  from .client import MQTT, Subscription
32  from .debug_info import TimestampedPublishMessage
33  from .device_trigger import Trigger
34  from .discovery import MQTTDiscoveryPayload
35  from .tag import MQTTTagScanner
36 
37 from .const import DOMAIN, TEMPLATE_ERRORS
38 
39 
40 class PayloadSentinel(StrEnum):
41  """Sentinel for `async_render_with_possible_json_value`."""
42 
43  NONE = "none"
44  DEFAULT = "default"
45 
46 
47 _LOGGER = logging.getLogger(__name__)
48 
49 ATTR_THIS = "this"
50 
51 type PublishPayloadType = str | bytes | int | float | None
52 
53 
55  payload: PublishPayloadType,
56 ) -> PublishPayloadType:
57  """Ensure correct raw MQTT payload is passed as bytes for publishing."""
58  if isinstance(payload, str) and payload.startswith(("b'", 'b"')):
59  try:
60  native_object = literal_eval(payload)
61  except (ValueError, TypeError, SyntaxError, MemoryError):
62  pass
63  else:
64  if isinstance(native_object, bytes):
65  return native_object
66 
67  return payload
68 
69 
70 @dataclass
72  """MQTT Message for publishing."""
73 
74  topic: str
75  payload: PublishPayloadType
76  qos: int
77  retain: bool
78 
79 
80 # eq=False so we use the id() of the object for comparison
81 # since client will only generate one instance of this object
82 # per messages/subscribed_topic.
83 @dataclass(slots=True, frozen=True, eq=False)
85  """MQTT Message received."""
86 
87  topic: str
88  payload: ReceivePayloadType
89  qos: int
90  retain: bool
91  subscribed_topic: str
92  timestamp: float
93 
94 
95 type MessageCallbackType = Callable[[ReceiveMessage], None]
96 
97 
98 class SubscriptionDebugInfo(TypedDict):
99  """Class for holding subscription debug info."""
100 
101  messages: deque[ReceiveMessage]
102  count: int
103 
104 
105 class EntityDebugInfo(TypedDict):
106  """Class for holding entity based debug info."""
107 
108  subscriptions: dict[str, SubscriptionDebugInfo]
109  discovery_data: DiscoveryInfoType
110  transmitted: dict[str, dict[str, deque[TimestampedPublishMessage]]]
111 
112 
113 class TriggerDebugInfo(TypedDict):
114  """Class for holding trigger based debug info."""
115 
116  device_id: str
117  discovery_data: DiscoveryInfoType
118 
119 
120 class PendingDiscovered(TypedDict):
121  """Pending discovered items."""
122 
123  pending: deque[MQTTDiscoveryPayload]
124  unsub: CALLBACK_TYPE
125 
126 
127 class MqttOriginInfo(TypedDict, total=False):
128  """Integration info of discovered entity."""
129 
130  name: str
131  manufacturer: str
132  sw_version: str
133  hw_version: str
134  support_url: str
135 
136 
138  """Handle MqttCommandTemplate exceptions."""
139 
140  _message: str
141 
142  def __init__(
143  self,
144  *args: object,
145  base_exception: Exception,
146  command_template: str,
147  value: PublishPayloadType,
148  entity_id: str | None = None,
149  ) -> None:
150  """Initialize exception."""
151  super().__init__(base_exception, *args)
152  value_log = str(value)
153  self.translation_domaintranslation_domaintranslation_domain = DOMAIN
154  self.translation_keytranslation_keytranslation_key = "command_template_error"
155  self.translation_placeholderstranslation_placeholderstranslation_placeholders = {
156  "error": str(base_exception),
157  "entity_id": str(entity_id),
158  "command_template": command_template,
159  }
160  entity_id_log = "" if entity_id is None else f" for entity '{entity_id}'"
161  self._message_message_message = (
162  f"{type(base_exception).__name__}: {base_exception} rendering template{entity_id_log}"
163  f", template: '{command_template}' and payload: {value_log}"
164  )
165 
166  def __str__(self) -> str:
167  """Return exception message string."""
168  return self._message_message_message
169 
170 
172  """Class for rendering MQTT payload with command templates."""
173 
174  def __init__(
175  self,
176  command_template: template.Template | None,
177  *,
178  entity: Entity | None = None,
179  ) -> None:
180  """Instantiate a command template."""
181  self._template_state_template_state: template.TemplateStateFromEntityId | None = None
182  self._command_template_command_template = command_template
183  self._entity_entity = entity
184 
185  @callback
187  self,
188  value: PublishPayloadType = None,
189  variables: TemplateVarsType = None,
190  ) -> PublishPayloadType:
191  """Render or convert the command template with given value or variables."""
192  if self._command_template_command_template is None:
193  return value
194 
195  values: dict[str, Any] = {"value": value}
196  if self._entity_entity:
197  values[ATTR_ENTITY_ID] = self._entity_entity.entity_id
198  values[ATTR_NAME] = self._entity_entity.name
199  if not self._template_state_template_state and self._command_template_command_template.hass is not None:
200  self._template_state_template_state = template.TemplateStateFromEntityId(
201  self._entity_entity.hass, self._entity_entity.entity_id
202  )
203  values[ATTR_THIS] = self._template_state_template_state
204 
205  if variables is not None:
206  values.update(variables)
207  _LOGGER.debug(
208  "Rendering outgoing payload with variables %s and %s",
209  values,
210  self._command_template_command_template,
211  )
212  try:
214  self._command_template_command_template.async_render(values, parse_result=False)
215  )
216  except TemplateError as exc:
218  base_exception=exc,
219  command_template=self._command_template_command_template.template,
220  value=value,
221  entity_id=self._entity_entity.entity_id if self._entity_entity is not None else None,
222  ) from exc
223 
224 
226  """Handle MqttValueTemplate exceptions."""
227 
228  _message: str
229 
230  def __init__(
231  self,
232  *args: object,
233  base_exception: Exception,
234  value_template: str,
235  default: ReceivePayloadType | PayloadSentinel,
236  payload: ReceivePayloadType,
237  entity_id: str | None = None,
238  ) -> None:
239  """Initialize exception."""
240  super().__init__(base_exception, *args)
241  entity_id_log = "" if entity_id is None else f" for entity '{entity_id}'"
242  default_log = str(default)
243  default_payload_log = (
244  "" if default is PayloadSentinel.NONE else f", default value: {default_log}"
245  )
246  payload_log = str(payload)
247  self._message_message_message = (
248  f"{type(base_exception).__name__}: {base_exception} rendering template{entity_id_log}"
249  f", template: '{value_template}'{default_payload_log} and payload: {payload_log}"
250  )
251 
252  def __str__(self) -> str:
253  """Return exception message string."""
254  return self._message_message_message
255 
256 
258  """Class for rendering MQTT value template with possible json values."""
259 
260  def __init__(
261  self,
262  value_template: template.Template | None,
263  *,
264  entity: Entity | None = None,
265  config_attributes: TemplateVarsType = None,
266  ) -> None:
267  """Instantiate a value template."""
268  self._template_state_template_state: template.TemplateStateFromEntityId | None = None
269  self._value_template_value_template = value_template
270  self._config_attributes_config_attributes = config_attributes
271  self._entity_entity = entity
272 
273  @callback
275  self,
276  payload: ReceivePayloadType,
277  default: ReceivePayloadType | PayloadSentinel = PayloadSentinel.NONE,
278  variables: TemplateVarsType = None,
279  ) -> ReceivePayloadType:
280  """Render with possible json value or pass-though a received MQTT value."""
281  rendered_payload: ReceivePayloadType
282 
283  if self._value_template_value_template is None:
284  return payload
285 
286  values: dict[str, Any] = {}
287 
288  if variables is not None:
289  values.update(variables)
290 
291  if self._config_attributes_config_attributes is not None:
292  values.update(self._config_attributes_config_attributes)
293 
294  if self._entity_entity:
295  values[ATTR_ENTITY_ID] = self._entity_entity.entity_id
296  values[ATTR_NAME] = self._entity_entity.name
297  if not self._template_state_template_state and self._value_template_value_template.hass:
298  self._template_state_template_state = template.TemplateStateFromEntityId(
299  self._value_template_value_template.hass, self._entity_entity.entity_id
300  )
301  values[ATTR_THIS] = self._template_state_template_state
302 
303  if default is PayloadSentinel.NONE:
304  _LOGGER.debug(
305  "Rendering incoming payload '%s' with variables %s and %s",
306  payload,
307  values,
308  self._value_template_value_template,
309  )
310  try:
311  rendered_payload = (
313  payload, variables=values
314  )
315  )
316  except TEMPLATE_ERRORS as exc:
318  base_exception=exc,
319  value_template=self._value_template_value_template.template,
320  default=default,
321  payload=payload,
322  entity_id=self._entity_entity.entity_id if self._entity_entity else None,
323  ) from exc
324  return rendered_payload
325 
326  _LOGGER.debug(
327  (
328  "Rendering incoming payload '%s' with variables %s with default value"
329  " '%s' and %s"
330  ),
331  payload,
332  values,
333  default,
334  self._value_template_value_template,
335  )
336  try:
337  rendered_payload = (
339  payload, default, variables=values
340  )
341  )
342  except TEMPLATE_ERRORS as exc:
344  base_exception=exc,
345  value_template=self._value_template_value_template.template,
346  default=default,
347  payload=payload,
348  entity_id=self._entity_entity.entity_id if self._entity_entity else None,
349  ) from exc
350  return rendered_payload
351 
352 
354  """Manage entity state write requests for subscribed topics."""
355 
356  def __init__(self) -> None:
357  """Register topic."""
358  self.subscribe_calls: dict[str, Entity] = {}
359 
360  @callback
361  def process_write_state_requests(self, msg: MQTTMessage) -> None:
362  """Process the write state requests."""
363  while self.subscribe_calls:
364  entity_id, entity = self.subscribe_calls.popitem()
365  try:
366  entity.async_write_ha_state()
367  except Exception:
368  _LOGGER.exception(
369  "Exception raised while updating state of %s, topic: "
370  "'%s' with payload: %s",
371  entity_id,
372  msg.topic,
373  msg.payload,
374  )
375 
376  @callback
377  def write_state_request(self, entity: Entity) -> None:
378  """Register write state request."""
379  self.subscribe_calls[entity.entity_id] = entity
380 
381 
382 @dataclass
383 class MqttData:
384  """Keep the MQTT entry data."""
385 
386  client: MQTT
387  config: list[ConfigType]
388  debug_info_entities: dict[str, EntityDebugInfo] = field(default_factory=dict)
389  debug_info_triggers: dict[tuple[str, str], TriggerDebugInfo] = field(
390  default_factory=dict
391  )
392  device_triggers: dict[str, Trigger] = field(default_factory=dict)
393  data_config_flow_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
394  discovery_already_discovered: set[tuple[str, str]] = field(default_factory=set)
395  discovery_pending_discovered: dict[tuple[str, str], PendingDiscovered] = field(
396  default_factory=dict
397  )
398  discovery_registry_hooks: dict[tuple[str, str], CALLBACK_TYPE] = field(
399  default_factory=dict
400  )
401  discovery_unsubscribe: list[CALLBACK_TYPE] = field(default_factory=list)
402  integration_unsubscribe: dict[str, CALLBACK_TYPE] = field(default_factory=dict)
403  last_discovery: float = 0.0
404  platforms_loaded: set[Platform | str] = field(default_factory=set)
405  reload_dispatchers: list[CALLBACK_TYPE] = field(default_factory=list)
406  reload_handlers: dict[str, CALLBACK_TYPE] = field(default_factory=dict)
407  reload_schema: dict[str, VolSchemaType] = field(default_factory=dict)
408  state_write_requests: EntityTopicState = field(default_factory=EntityTopicState)
409  subscriptions_to_restore: set[Subscription] = field(default_factory=set)
410  tags: dict[str, dict[str, MQTTTagScanner]] = field(default_factory=dict)
411 
412 
413 @dataclass(slots=True)
415  """(component, object_id, node_id, discovery_payload)."""
416 
417  component: str
418  object_id: str
419  node_id: str | None
420  discovery_payload: MQTTDiscoveryPayload
421 
422 
423 DATA_MQTT: HassKey[MqttData] = HassKey("mqtt")
424 DATA_MQTT_AVAILABLE: HassKey[asyncio.Future[bool]] = HassKey("mqtt_client_available")
None process_write_state_requests(self, MQTTMessage msg)
Definition: models.py:361
None __init__(self, *object args, Exception base_exception, str command_template, PublishPayloadType value, str|None entity_id=None)
Definition: models.py:149
PublishPayloadType async_render(self, PublishPayloadType value=None, TemplateVarsType variables=None)
Definition: models.py:190
None __init__(self, template.Template|None command_template, *Entity|None entity=None)
Definition: models.py:179
None __init__(self, *object args, Exception base_exception, str value_template, ReceivePayloadType|PayloadSentinel default, ReceivePayloadType payload, str|None entity_id=None)
Definition: models.py:238
None __init__(self, template.Template|None value_template, *Entity|None entity=None, TemplateVarsType config_attributes=None)
Definition: models.py:266
ReceivePayloadType async_render_with_possible_json_value(self, ReceivePayloadType payload, ReceivePayloadType|PayloadSentinel default=PayloadSentinel.NONE, TemplateVarsType variables=None)
Definition: models.py:279
PublishPayloadType convert_outgoing_mqtt_payload(PublishPayloadType payload)
Definition: models.py:56