Home Assistant Unofficial Reference 2024.12.1
trigger.py
Go to the documentation of this file.
1 """Offer MQTT listening automation rules."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from contextlib import suppress
7 import logging
8 from typing import Any
9 
10 import voluptuous as vol
11 
12 from homeassistant.const import CONF_PAYLOAD, CONF_PLATFORM, CONF_VALUE_TEMPLATE
13 from homeassistant.core import (
14  CALLBACK_TYPE,
15  HassJob,
16  HassJobType,
17  HomeAssistant,
18  callback,
19 )
20 from homeassistant.helpers import config_validation as cv
21 from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
22 from homeassistant.helpers.template import Template
23 from homeassistant.helpers.trigger import TriggerActionType, TriggerData, TriggerInfo
24 from homeassistant.helpers.typing import ConfigType, TemplateVarsType
25 from homeassistant.util.json import json_loads
26 
27 from .client import async_subscribe_internal
28 from .const import (
29  CONF_ENCODING,
30  CONF_QOS,
31  CONF_TOPIC,
32  DEFAULT_ENCODING,
33  DEFAULT_QOS,
34  DOMAIN,
35 )
36 from .models import (
37  MqttCommandTemplate,
38  MqttValueTemplate,
39  PayloadSentinel,
40  PublishPayloadType,
41  ReceiveMessage,
42 )
43 from .util import valid_subscribe_topic, valid_subscribe_topic_template
44 
45 TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
46  {
47  vol.Required(CONF_PLATFORM): DOMAIN,
48  vol.Required(CONF_TOPIC): valid_subscribe_topic_template,
49  vol.Optional(CONF_PAYLOAD): cv.template,
50  vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
51  vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
52  vol.Optional(CONF_QOS, default=DEFAULT_QOS): vol.All(
53  vol.Coerce(int), vol.In([0, 1, 2])
54  ),
55  }
56 )
57 
58 _LOGGER = logging.getLogger(__name__)
59 
60 
62  hass: HomeAssistant,
63  config: ConfigType,
64  action: TriggerActionType,
65  trigger_info: TriggerInfo,
66 ) -> CALLBACK_TYPE:
67  """Listen for state changes based on configuration."""
68  trigger_data: TriggerData = trigger_info["trigger_data"]
69  command_template: Callable[
70  [PublishPayloadType, TemplateVarsType], PublishPayloadType
71  ] = MqttCommandTemplate(config.get(CONF_PAYLOAD)).async_render
72  value_template: Callable[[ReceivePayloadType, str], ReceivePayloadType]
73  value_template = MqttValueTemplate(
74  config.get(CONF_VALUE_TEMPLATE)
75  ).async_render_with_possible_json_value
76  encoding: str | None = config[CONF_ENCODING] or None
77  qos: int = config[CONF_QOS]
78  job = HassJob(action)
79  variables: TemplateVarsType | None = None
80  if trigger_info:
81  variables = trigger_info.get("variables")
82 
83  wanted_payload = command_template(None, variables)
84 
85  topic_template: Template = config[CONF_TOPIC]
86  topic = topic_template.async_render(variables, limited=True, parse_result=False)
88 
89  @callback
90  def mqtt_automation_listener(mqttmsg: ReceiveMessage) -> None:
91  """Listen for MQTT messages."""
92  if wanted_payload is None or (
93  (payload := value_template(mqttmsg.payload, PayloadSentinel.DEFAULT))
94  and payload is not PayloadSentinel.DEFAULT
95  and wanted_payload == payload
96  ):
97  data: dict[str, Any] = {
98  **trigger_data,
99  "platform": "mqtt",
100  "topic": mqttmsg.topic,
101  "payload": mqttmsg.payload,
102  "qos": mqttmsg.qos,
103  "description": f"mqtt topic {mqttmsg.topic}",
104  }
105 
106  with suppress(ValueError):
107  data["payload_json"] = json_loads(mqttmsg.payload)
108 
109  hass.async_run_hass_job(job, {"trigger": data})
110 
111  _LOGGER.debug(
112  "Attaching MQTT trigger for topic: '%s', payload: '%s'", topic, wanted_payload
113  )
114 
116  hass,
117  topic,
118  mqtt_automation_listener,
119  encoding=encoding,
120  qos=qos,
121  job_type=HassJobType.Callback,
122  )
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
Definition: client.py:210
CALLBACK_TYPE async_attach_trigger(HomeAssistant hass, ConfigType config, TriggerActionType action, TriggerInfo trigger_info)
Definition: trigger.py:66
str valid_subscribe_topic(Any topic)
Definition: util.py:270