1 """Offer MQTT listening automation rules."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from contextlib
import suppress
10 import voluptuous
as vol
27 from .client
import async_subscribe_internal
43 from .util
import valid_subscribe_topic, valid_subscribe_topic_template
45 TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
47 vol.Required(CONF_PLATFORM): DOMAIN,
48 vol.Required(CONF_TOPIC): valid_subscribe_topic_template,
49 vol.Optional(CONF_PAYLOAD): cv.template,
50 vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
51 vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
52 vol.Optional(CONF_QOS, default=DEFAULT_QOS): vol.All(
53 vol.Coerce(int), vol.In([0, 1, 2])
58 _LOGGER = logging.getLogger(__name__)
64 action: TriggerActionType,
65 trigger_info: TriggerInfo,
67 """Listen for state changes based on configuration."""
68 trigger_data: TriggerData = trigger_info[
"trigger_data"]
69 command_template: Callable[
70 [PublishPayloadType, TemplateVarsType], PublishPayloadType
72 value_template: Callable[[ReceivePayloadType, str], ReceivePayloadType]
74 config.get(CONF_VALUE_TEMPLATE)
75 ).async_render_with_possible_json_value
76 encoding: str |
None = config[CONF_ENCODING]
or None
77 qos: int = config[CONF_QOS]
79 variables: TemplateVarsType |
None =
None
81 variables = trigger_info.get(
"variables")
83 wanted_payload = command_template(
None, variables)
85 topic_template: Template = config[CONF_TOPIC]
86 topic = topic_template.async_render(variables, limited=
True, parse_result=
False)
90 def mqtt_automation_listener(mqttmsg: ReceiveMessage) ->
None:
91 """Listen for MQTT messages."""
92 if wanted_payload
is None or (
93 (payload := value_template(mqttmsg.payload, PayloadSentinel.DEFAULT))
94 and payload
is not PayloadSentinel.DEFAULT
95 and wanted_payload == payload
97 data: dict[str, Any] = {
100 "topic": mqttmsg.topic,
101 "payload": mqttmsg.payload,
103 "description": f
"mqtt topic {mqttmsg.topic}",
106 with suppress(ValueError):
107 data[
"payload_json"] =
json_loads(mqttmsg.payload)
109 hass.async_run_hass_job(job, {
"trigger": data})
112 "Attaching MQTT trigger for topic: '%s', payload: '%s'", topic, wanted_payload
118 mqtt_automation_listener,
121 job_type=HassJobType.Callback,
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
CALLBACK_TYPE async_attach_trigger(HomeAssistant hass, ConfigType config, TriggerActionType action, TriggerInfo trigger_info)
str valid_subscribe_topic(Any topic)