1 """Models used by multiple MQTT modules."""
3 from __future__
import annotations
5 from ast
import literal_eval
7 from collections
import deque
8 from collections.abc
import Callable
9 from dataclasses
import dataclass, field
10 from enum
import StrEnum
12 from typing
import TYPE_CHECKING, Any, TypedDict
29 from paho.mqtt.client
import MQTTMessage
31 from .client
import MQTT, Subscription
32 from .debug_info
import TimestampedPublishMessage
33 from .device_trigger
import Trigger
34 from .discovery
import MQTTDiscoveryPayload
35 from .tag
import MQTTTagScanner
37 from .const
import DOMAIN, TEMPLATE_ERRORS
41 """Sentinel for `async_render_with_possible_json_value`."""
47 _LOGGER = logging.getLogger(__name__)
51 type PublishPayloadType = str | bytes | int | float |
None
55 payload: PublishPayloadType,
56 ) -> PublishPayloadType:
57 """Ensure correct raw MQTT payload is passed as bytes for publishing."""
58 if isinstance(payload, str)
and payload.startswith((
"b'",
'b"')):
60 native_object = literal_eval(payload)
61 except (ValueError, TypeError, SyntaxError, MemoryError):
64 if isinstance(native_object, bytes):
72 """MQTT Message for publishing."""
75 payload: PublishPayloadType
83 @dataclass(slots=True, frozen=True, eq=False)
85 """MQTT Message received."""
88 payload: ReceivePayloadType
95 type MessageCallbackType = Callable[[ReceiveMessage],
None]
99 """Class for holding subscription debug info."""
101 messages: deque[ReceiveMessage]
106 """Class for holding entity based debug info."""
108 subscriptions: dict[str, SubscriptionDebugInfo]
109 discovery_data: DiscoveryInfoType
110 transmitted: dict[str, dict[str, deque[TimestampedPublishMessage]]]
114 """Class for holding trigger based debug info."""
117 discovery_data: DiscoveryInfoType
121 """Pending discovered items."""
123 pending: deque[MQTTDiscoveryPayload]
128 """Integration info of discovered entity."""
138 """Handle MqttCommandTemplate exceptions."""
145 base_exception: Exception,
146 command_template: str,
147 value: PublishPayloadType,
148 entity_id: str |
None =
None,
150 """Initialize exception."""
151 super().
__init__(base_exception, *args)
152 value_log =
str(value)
156 "error":
str(base_exception),
157 "entity_id":
str(entity_id),
158 "command_template": command_template,
160 entity_id_log =
"" if entity_id
is None else f
" for entity '{entity_id}'"
162 f
"{type(base_exception).__name__}: {base_exception} rendering template{entity_id_log}"
163 f
", template: '{command_template}' and payload: {value_log}"
167 """Return exception message string."""
172 """Class for rendering MQTT payload with command templates."""
176 command_template: template.Template |
None,
178 entity: Entity |
None =
None,
180 """Instantiate a command template."""
181 self.
_template_state_template_state: template.TemplateStateFromEntityId |
None =
None
188 value: PublishPayloadType =
None,
189 variables: TemplateVarsType =
None,
190 ) -> PublishPayloadType:
191 """Render or convert the command template with given value or variables."""
195 values: dict[str, Any] = {
"value": value}
197 values[ATTR_ENTITY_ID] = self.
_entity_entity.entity_id
198 values[ATTR_NAME] = self.
_entity_entity.name
205 if variables
is not None:
206 values.update(variables)
208 "Rendering outgoing payload with variables %s and %s",
216 except TemplateError
as exc:
221 entity_id=self.
_entity_entity.entity_id
if self.
_entity_entity
is not None else None,
226 """Handle MqttValueTemplate exceptions."""
233 base_exception: Exception,
235 default: ReceivePayloadType | PayloadSentinel,
236 payload: ReceivePayloadType,
237 entity_id: str |
None =
None,
239 """Initialize exception."""
240 super().
__init__(base_exception, *args)
241 entity_id_log =
"" if entity_id
is None else f
" for entity '{entity_id}'"
242 default_log =
str(default)
243 default_payload_log = (
244 "" if default
is PayloadSentinel.NONE
else f
", default value: {default_log}"
246 payload_log =
str(payload)
248 f
"{type(base_exception).__name__}: {base_exception} rendering template{entity_id_log}"
249 f
", template: '{value_template}'{default_payload_log} and payload: {payload_log}"
253 """Return exception message string."""
258 """Class for rendering MQTT value template with possible json values."""
262 value_template: template.Template |
None,
264 entity: Entity |
None =
None,
265 config_attributes: TemplateVarsType =
None,
267 """Instantiate a value template."""
268 self.
_template_state_template_state: template.TemplateStateFromEntityId |
None =
None
276 payload: ReceivePayloadType,
277 default: ReceivePayloadType | PayloadSentinel = PayloadSentinel.NONE,
278 variables: TemplateVarsType =
None,
279 ) -> ReceivePayloadType:
280 """Render with possible json value or pass-though a received MQTT value."""
281 rendered_payload: ReceivePayloadType
286 values: dict[str, Any] = {}
288 if variables
is not None:
289 values.update(variables)
295 values[ATTR_ENTITY_ID] = self.
_entity_entity.entity_id
296 values[ATTR_NAME] = self.
_entity_entity.name
303 if default
is PayloadSentinel.NONE:
305 "Rendering incoming payload '%s' with variables %s and %s",
313 payload, variables=values
316 except TEMPLATE_ERRORS
as exc:
322 entity_id=self.
_entity_entity.entity_id
if self.
_entity_entity
else None,
324 return rendered_payload
328 "Rendering incoming payload '%s' with variables %s with default value"
339 payload, default, variables=values
342 except TEMPLATE_ERRORS
as exc:
348 entity_id=self.
_entity_entity.entity_id
if self.
_entity_entity
else None,
350 return rendered_payload
354 """Manage entity state write requests for subscribed topics."""
357 """Register topic."""
358 self.subscribe_calls: dict[str, Entity] = {}
362 """Process the write state requests."""
363 while self.subscribe_calls:
364 entity_id, entity = self.subscribe_calls.popitem()
366 entity.async_write_ha_state()
369 "Exception raised while updating state of %s, topic: "
370 "'%s' with payload: %s",
378 """Register write state request."""
379 self.subscribe_calls[entity.entity_id] = entity
384 """Keep the MQTT entry data."""
387 config: list[ConfigType]
388 debug_info_entities: dict[str, EntityDebugInfo] = field(default_factory=dict)
389 debug_info_triggers: dict[tuple[str, str], TriggerDebugInfo] = field(
392 device_triggers: dict[str, Trigger] = field(default_factory=dict)
393 data_config_flow_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
394 discovery_already_discovered: set[tuple[str, str]] = field(default_factory=set)
395 discovery_pending_discovered: dict[tuple[str, str], PendingDiscovered] = field(
398 discovery_registry_hooks: dict[tuple[str, str], CALLBACK_TYPE] = field(
401 discovery_unsubscribe: list[CALLBACK_TYPE] = field(default_factory=list)
402 integration_unsubscribe: dict[str, CALLBACK_TYPE] = field(default_factory=dict)
403 last_discovery: float = 0.0
404 platforms_loaded: set[Platform | str] = field(default_factory=set)
405 reload_dispatchers: list[CALLBACK_TYPE] = field(default_factory=list)
406 reload_handlers: dict[str, CALLBACK_TYPE] = field(default_factory=dict)
407 reload_schema: dict[str, VolSchemaType] = field(default_factory=dict)
408 state_write_requests: EntityTopicState = field(default_factory=EntityTopicState)
409 subscriptions_to_restore: set[Subscription] = field(default_factory=set)
410 tags: dict[str, dict[str, MQTTTagScanner]] = field(default_factory=dict)
413 @dataclass(slots=True)
415 """(component, object_id, node_id, discovery_payload)."""
420 discovery_payload: MQTTDiscoveryPayload
423 DATA_MQTT: HassKey[MqttData] =
HassKey(
"mqtt")
424 DATA_MQTT_AVAILABLE: HassKey[asyncio.Future[bool]] =
HassKey(
"mqtt_client_available")
None process_write_state_requests(self, MQTTMessage msg)
None write_state_request(self, Entity entity)
None __init__(self, *object args, Exception base_exception, str command_template, PublishPayloadType value, str|None entity_id=None)
PublishPayloadType async_render(self, PublishPayloadType value=None, TemplateVarsType variables=None)
None __init__(self, template.Template|None command_template, *Entity|None entity=None)
None __init__(self, *object args, Exception base_exception, str value_template, ReceivePayloadType|PayloadSentinel default, ReceivePayloadType payload, str|None entity_id=None)
None __init__(self, template.Template|None value_template, *Entity|None entity=None, TemplateVarsType config_attributes=None)
ReceivePayloadType async_render_with_possible_json_value(self, ReceivePayloadType payload, ReceivePayloadType|PayloadSentinel default=PayloadSentinel.NONE, TemplateVarsType variables=None)
PublishPayloadType convert_outgoing_mqtt_payload(PublishPayloadType payload)