1 """Support for MQTT message handling."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from datetime
import datetime
9 from typing
import TYPE_CHECKING, Any, cast
11 import voluptuous
as vol
13 from homeassistant
import config
as conf_util
19 ConfigValidationError,
20 ServiceValidationError,
24 config_validation
as cv,
25 entity_registry
as er,
41 from .
import debug_info, discovery
46 async_subscribe_internal,
50 from .config
import MQTT_BASE_SCHEMA, MQTT_RO_SCHEMA, MQTT_RW_SCHEMA
51 from .config_integration
import CONFIG_SCHEMA_BASE
63 CONF_DISCOVERY_PREFIX,
80 MQTT_CONNECTION_STATE,
92 convert_outgoing_mqtt_payload,
94 from .subscription
import (
96 async_prepare_subscribe_topics,
97 async_subscribe_topics,
98 async_unsubscribe_topics,
101 async_create_certificate_temp_files,
102 async_forward_entry_setup_and_setup_discovery,
103 async_wait_for_mqtt_client,
104 mqtt_config_entry_enabled,
105 platforms_from_config,
108 valid_subscribe_topic,
111 _LOGGER = logging.getLogger(__name__)
113 SERVICE_PUBLISH =
"publish"
114 SERVICE_DUMP =
"dump"
116 ATTR_TOPIC_TEMPLATE =
"topic_template"
117 ATTR_PAYLOAD_TEMPLATE =
"payload_template"
118 ATTR_EVALUATE_PAYLOAD =
"evaluate_payload"
120 MAX_RECONNECT_WAIT = 300
122 CONNECTION_SUCCESS =
"connection_success"
123 CONNECTION_FAILED =
"connection_failed"
124 CONNECTION_FAILED_RECOVERABLE =
"connection_failed_recoverable"
147 CONFIG_SCHEMA = vol.Schema(
152 [CONFIG_SCHEMA_BASE],
155 extra=vol.ALLOW_EXTRA,
163 MQTT_PUBLISH_SCHEMA = vol.All(
166 vol.Exclusive(ATTR_TOPIC, CONF_TOPIC): valid_publish_topic,
167 vol.Exclusive(ATTR_TOPIC_TEMPLATE, CONF_TOPIC): cv.string,
168 vol.Exclusive(ATTR_PAYLOAD, CONF_PAYLOAD): cv.string,
169 vol.Exclusive(ATTR_PAYLOAD_TEMPLATE, CONF_PAYLOAD): cv.string,
170 vol.Optional(ATTR_EVALUATE_PAYLOAD): cv.boolean,
171 vol.Optional(ATTR_QOS, default=DEFAULT_QOS): valid_qos_schema,
172 vol.Optional(ATTR_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
176 cv.has_at_least_one_key(ATTR_TOPIC, ATTR_TOPIC_TEMPLATE),
181 """Handle signals of config entry being updated.
183 Causes for this is config entry options changing.
185 await hass.config_entries.async_reload(entry.entry_id)
190 """Unregister open config issues."""
191 issue_registry = ir.async_get(hass)
194 for (domain, issue_id), issue_entry
in issue_registry.issues.items()
195 if domain == DOMAIN
and issue_entry.translation_key ==
"invalid_platform_config"
197 for issue
in open_issues:
198 ir.async_delete_issue(hass, DOMAIN, issue)
202 hass: HomeAssistant, config_yaml: ConfigType
204 """Validate manually configured MQTT items."""
205 mqtt_data = hass.data[DATA_MQTT]
206 mqtt_config: list[dict[str, list[ConfigType]]] = config_yaml.get(DOMAIN, {})
207 for mqtt_config_item
in mqtt_config:
208 for domain, config_items
in mqtt_config_item.items():
209 schema = mqtt_data.reload_schema[domain]
210 for config
in config_items:
213 except vol.Invalid
as exc:
215 message = conf_util.format_schema_error(
216 hass, exc, domain, config, integration.documentation
220 translation_domain=DOMAIN,
221 translation_key=
"invalid_platform_config",
222 translation_placeholders={
228 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
229 """Set up the actions and websocket API for the MQTT component."""
231 websocket_api.async_register_command(hass, websocket_subscribe)
232 websocket_api.async_register_command(hass, websocket_mqtt_info)
234 async
def async_publish_service(call: ServiceCall) ->
None:
235 """Handle MQTT publish service calls."""
236 msg_topic: str |
None = call.data.get(ATTR_TOPIC)
237 msg_topic_template: str |
None = call.data.get(ATTR_TOPIC_TEMPLATE)
241 translation_key=
"mqtt_not_setup_cannot_publish",
242 translation_domain=DOMAIN,
243 translation_placeholders={
244 "topic":
str(msg_topic
or msg_topic_template)
248 mqtt_data = hass.data[DATA_MQTT]
249 payload: PublishPayloadType = call.data.get(ATTR_PAYLOAD)
250 evaluate_payload: bool = call.data.get(ATTR_EVALUATE_PAYLOAD,
False)
251 payload_template: str |
None = call.data.get(ATTR_PAYLOAD_TEMPLATE)
252 qos: int = call.data[ATTR_QOS]
253 retain: bool = call.data[ATTR_RETAIN]
254 if msg_topic_template
is not None:
258 rendered_topic: Any = MqttCommandTemplate(
259 template.Template(msg_topic_template, hass),
261 ir.async_create_issue(
264 f
"topic_template_deprecation_{rendered_topic}",
265 breaks_in_ha_version=
"2025.2.0",
267 severity=ir.IssueSeverity.WARNING,
268 translation_key=
"topic_template_deprecation",
269 translation_placeholders={
270 "topic_template": msg_topic_template,
271 "topic": rendered_topic,
276 except vol.Invalid
as err:
279 translation_domain=DOMAIN,
280 translation_key=
"invalid_publish_topic",
281 translation_placeholders={
283 "topic":
str(rendered_topic),
284 "topic_template":
str(msg_topic_template),
288 if payload_template
is not None:
293 assert msg_topic
is not None
294 ir.async_create_issue(
297 f
"payload_template_deprecation_{msg_topic}",
298 breaks_in_ha_version=
"2025.2.0",
300 severity=ir.IssueSeverity.WARNING,
301 translation_key=
"payload_template_deprecation",
302 translation_placeholders={
304 "payload_template": payload_template,
307 payload = MqttCommandTemplate(
308 template.Template(payload_template, hass)
310 elif evaluate_payload:
315 assert msg_topic
is not None
316 await mqtt_data.client.async_publish(msg_topic, payload, qos, retain)
318 hass.services.async_register(
319 DOMAIN, SERVICE_PUBLISH, async_publish_service, schema=MQTT_PUBLISH_SCHEMA
322 async
def async_dump_service(call: ServiceCall) ->
None:
323 """Handle MQTT dump service calls."""
324 messages: list[tuple[str, str]] = []
327 def collect_msg(msg: ReceiveMessage) ->
None:
328 messages.append((msg.topic,
str(msg.payload).replace(
"\n",
"")))
332 def write_dump() -> None:
333 with open(hass.config.path(
"mqtt_dump.txt"),
"w", encoding=
"utf8")
as fp:
335 fp.write(
",".join(msg) +
"\n")
337 async
def finish_dump(_: datetime) ->
None:
338 """Write dump to file."""
340 await hass.async_add_executor_job(write_dump)
342 ev.async_call_later(hass, call.data[
"duration"], finish_dump)
344 hass.services.async_register(
350 vol.Required(
"topic"): valid_subscribe_topic,
351 vol.Optional(
"duration", default=5): int,
359 """Load a config entry."""
363 async
def _setup_client() -> tuple[MqttData, dict[str, Any]]:
364 """Set up the MQTT client."""
366 conf =
dict(entry.data)
367 hass_config = await conf_util.async_hass_config_yaml(hass)
370 client =
MQTT(hass, entry, conf)
371 if DOMAIN
in hass.data:
372 mqtt_data = hass.data[DATA_MQTT]
373 mqtt_data.config = mqtt_yaml
374 mqtt_data.client = client
377 hass.data[DATA_MQTT] = mqtt_data = MqttData(config=mqtt_yaml, client=client)
378 await client.async_start(mqtt_data)
381 if mqtt_data.subscriptions_to_restore:
382 mqtt_data.client.async_restore_tracked_subscriptions(
383 mqtt_data.subscriptions_to_restore
385 mqtt_data.subscriptions_to_restore = set()
386 mqtt_data.reload_dispatchers.append(
387 entry.add_update_listener(_async_config_entry_updated)
390 return (mqtt_data, conf)
392 client_available: asyncio.Future[bool]
393 if DATA_MQTT_AVAILABLE
not in hass.data:
394 client_available = hass.data[DATA_MQTT_AVAILABLE] = hass.loop.create_future()
396 client_available = hass.data[DATA_MQTT_AVAILABLE]
398 mqtt_data, conf = await _setup_client()
400 platforms_used.update(
402 for entry
in er.async_entries_for_config_entry(
403 er.async_get(hass), entry.entry_id
411 if not integration.platforms_are_loaded(platforms_used):
413 await integration.async_get_platforms(platforms_used)
419 await mqtt_data.client.async_connect(client_available)
422 async
def _reload_config(call: ServiceCall) ->
None:
423 """Reload the platforms."""
427 hass, DOMAIN, raise_on_failure=
True
429 except ConfigValidationError
as ex:
431 translation_domain=ex.translation_domain,
432 translation_key=ex.translation_key,
433 translation_placeholders=ex.translation_placeholders,
436 new_config: list[ConfigType] = config_yaml.get(DOMAIN, [])
438 new_platforms = platforms_used - mqtt_data.platforms_loaded
446 mqtt_data.config = new_config
451 create_eager_task(entity.async_remove())
452 for mqtt_platform
in mqtt_platforms
453 for entity
in list(mqtt_platform.entities.values())
454 if getattr(entity,
"_discovery_data",
None)
is None
455 and mqtt_platform.config_entry
456 and mqtt_platform.domain
in ENTITY_PLATFORMS
458 await asyncio.gather(*tasks)
460 for component
in mqtt_data.reload_handlers.values():
464 hass.bus.async_fire(f
"event_{DOMAIN}_reloaded", context=call.context)
468 if not hass.services.has_service(DOMAIN, SERVICE_RELOAD):
471 if conf.get(CONF_DISCOVERY, DEFAULT_DISCOVERY):
472 await discovery.async_start(
473 hass, conf.get(CONF_DISCOVERY_PREFIX, DEFAULT_PREFIX), entry
479 @websocket_api.websocket_command(
{vol.Required("type"):
"mqtt/device/debug_info", vol.Required(
"device_id"): str}
485 """Get MQTT debug info for device."""
486 device_id = msg[
"device_id"]
487 mqtt_info = debug_info.info_for_device(hass, device_id)
489 connection.send_result(msg[
"id"], mqtt_info)
492 @websocket_api.websocket_command(
{
vol.Required("type"):
"mqtt/subscribe",
493 vol.Required(
"topic"): valid_subscribe_topic,
494 vol.Optional(
"qos"): valid_qos_schema,
497 @websocket_api.async_response
501 """Subscribe to a MQTT topic."""
502 if not connection.user.is_admin:
506 def forward_messages(mqttmsg: ReceiveMessage) ->
None:
507 """Forward events to websocket."""
509 payload = cast(bytes, mqttmsg.payload).decode(
512 except (AttributeError, UnicodeDecodeError):
514 payload =
str(mqttmsg.payload)
516 connection.send_message(
517 websocket_api.event_message(
520 "topic": mqttmsg.topic,
523 "retain": mqttmsg.retain,
529 qos: int = msg.get(
"qos", DEFAULT_QOS)
531 hass, msg[
"topic"], forward_messages, encoding=
None, qos=qos
534 connection.send_message(websocket_api.result_message(msg[
"id"]))
537 type ConnectionStatusCallback = Callable[[bool],
None]
542 hass: HomeAssistant, connection_status_callback: ConnectionStatusCallback
543 ) -> Callable[[],
None]:
544 """Subscribe to MQTT connection changes."""
546 hass, MQTT_CONNECTION_STATE, connection_status_callback
551 """Return if MQTT client is connected."""
552 mqtt_data = hass.data[DATA_MQTT]
553 return mqtt_data.client.connected
557 hass: HomeAssistant, config_entry: ConfigEntry, device_entry: DeviceEntry
559 """Remove MQTT config entry from a device."""
561 from .
import device_automation
563 await device_automation.async_removed_from_device(hass, device_entry.id)
568 """Unload MQTT dump and publish service when the config entry is unloaded."""
569 mqtt_data = hass.data[DATA_MQTT]
570 mqtt_client = mqtt_data.client
573 await discovery.async_stop(hass)
575 await hass.config_entries.async_unload_platforms(entry, mqtt_data.platforms_loaded)
576 mqtt_data.platforms_loaded = set()
577 await asyncio.sleep(0)
579 while reload_dispatchers := mqtt_data.reload_dispatchers:
580 reload_dispatchers.pop()()
582 mqtt_client.cleanup()
585 registry_hooks = mqtt_data.discovery_registry_hooks
586 while registry_hooks:
587 registry_hooks.popitem()[1]()
589 await mqtt_client.async_disconnect(disconnect_paho_client=
True)
592 hass.data.pop(DATA_MQTT_AVAILABLE,
None)
595 if subscriptions := mqtt_client.subscriptions:
596 mqtt_data.subscriptions_to_restore = subscriptions
602
web.Response get(self, web.Request request, str config_key)
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
PublishPayloadType convert_outgoing_mqtt_payload(PublishPayloadType payload)
None async_create_certificate_temp_files(HomeAssistant hass, ConfigType config)
bool|None mqtt_config_entry_enabled(HomeAssistant hass)
set[Platform|str] platforms_from_config(list[ConfigType] config)
str valid_publish_topic(Any topic)
None async_forward_entry_setup_and_setup_discovery(HomeAssistant hass, ConfigEntry config_entry, set[Platform|str] platforms, bool late=False)
Callable[[], None] async_subscribe_connection_status(HomeAssistant hass, ConnectionStatusCallback connection_status_callback)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
None websocket_subscribe(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_mqtt_info(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None async_check_config_schema(HomeAssistant hass, ConfigType config_yaml)
bool is_connected(HomeAssistant hass)
None _async_config_entry_updated(HomeAssistant hass, ConfigEntry entry)
None _async_remove_mqtt_issues(HomeAssistant hass, MqttData mqtt_data)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
bool async_remove_config_entry_device(HomeAssistant hass, ConfigEntry config_entry, DeviceEntry device_entry)
bool async_setup(HomeAssistant hass, ConfigType config)
None open(self, **Any kwargs)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
ConfigType|None async_integration_yaml_config(HomeAssistant hass, str integration_name)
None async_register_admin_service(HomeAssistant hass, str domain, str service, Callable[[ServiceCall], Awaitable[None]|None] service_func, VolSchemaType schema=vol.Schema({}, extra=vol.PREVENT_EXTRA))
Integration async_get_loaded_integration(HomeAssistant hass, str domain)
Integration async_get_integration(HomeAssistant hass, str domain)
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)