1 """Support for MQTT discovery."""
3 from __future__
import annotations
6 from collections
import deque
7 from dataclasses
import dataclass
9 from itertools
import chain
13 from typing
import TYPE_CHECKING, Any
15 import voluptuous
as vol
20 signal_discovered_config_entry_removed,
27 async_dispatcher_connect,
28 async_dispatcher_send,
36 from .abbreviations
import ABBREVIATIONS, DEVICE_ABBREVIATIONS, ORIGIN_ABBREVIATIONS
37 from .client
import async_subscribe_internal
40 ATTR_DISCOVERY_PAYLOAD,
49 from .models
import DATA_MQTT, MqttComponentConfig, MqttOriginInfo, ReceiveMessage
50 from .schemas
import DEVICE_DISCOVERY_SCHEMA, MQTT_ORIGIN_INFO_SCHEMA, SHARED_OPTIONS
51 from .util
import async_forward_entry_setup_and_setup_discovery
53 ABBREVIATIONS_SET = set(ABBREVIATIONS)
54 DEVICE_ABBREVIATIONS_SET = set(DEVICE_ABBREVIATIONS)
55 ORIGIN_ABBREVIATIONS_SET = set(ORIGIN_ABBREVIATIONS)
57 _LOGGER = logging.getLogger(__name__)
59 TOPIC_MATCHER = re.compile(
60 r"(?P<component>\w+)/(?:(?P<node_id>[a-zA-Z0-9_-]+)/)"
61 r"?(?P<object_id>[a-zA-Z0-9_-]+)/config"
64 MQTT_DISCOVERY_UPDATED: SignalTypeFormat[MQTTDiscoveryPayload] =
SignalTypeFormat(
65 "mqtt_discovery_updated_{}_{}"
67 MQTT_DISCOVERY_NEW: SignalTypeFormat[MQTTDiscoveryPayload] =
SignalTypeFormat(
68 "mqtt_discovery_new_{}_{}"
71 "mqtt_discovery_done_{}_{}"
76 CONF_MIGRATE_DISCOVERY =
"migrate_discovery"
78 MIGRATE_DISCOVERY_SCHEMA = vol.Schema(
79 {vol.Optional(CONF_MIGRATE_DISCOVERY):
True},
84 """Class to hold and MQTT discovery payload and discovery data."""
86 device_discovery: bool =
False
87 migrate_discovery: bool =
False
88 discovery_data: DiscoveryInfoType
91 @dataclass(frozen=True)
93 """Class to hold an integration discovery playload."""
101 """Process a discovery migration request in the discovery payload."""
103 if migr_discvry := (payload.pop(
"migr_discvry",
None)):
104 payload[CONF_MIGRATE_DISCOVERY] = migr_discvry
105 if CONF_MIGRATE_DISCOVERY
in payload:
108 except vol.Invalid
as exc:
111 payload.migrate_discovery =
True
118 """Clear entry from already discovered list."""
119 hass.data[DATA_MQTT].discovery_already_discovered.discard(discovery_hash)
123 """Add entry to already discovered list."""
124 hass.data[DATA_MQTT].discovery_already_discovered.add(discovery_hash)
129 discovery_payload: MQTTDiscoveryPayload, *, include_url: bool
131 """Get the origin information from a discovery payload for logging."""
132 if CONF_ORIGIN
not in discovery_payload:
134 origin_info: MqttOriginInfo = discovery_payload[CONF_ORIGIN]
136 if sw_version := origin_info.get(
"sw_version"):
137 sw_version_log = f
", version: {sw_version}"
140 support_url_log = f
", support URL: {support_url}"
141 return f
" from external application {origin_info["name
"]}{sw_version_log}{support_url_log}"
146 """Get the origin information support URL from a discovery payload."""
147 if CONF_ORIGIN
not in discovery_payload:
149 origin_info: MqttOriginInfo = discovery_payload[CONF_ORIGIN]
150 return origin_info.get(
"support_url")
155 message: str, discovery_payload: MQTTDiscoveryPayload, level: int = logging.INFO
157 """Log information about the discovery and origin."""
159 if not _LOGGER.isEnabledFor(level):
172 payload: dict[str, Any] | str,
173 abbreviations: dict[str, str],
174 abbreviations_set: set[str],
176 """Replace abbreviations in an MQTT discovery payload."""
177 if not isinstance(payload, dict):
179 for key
in abbreviations_set.intersection(payload):
180 payload[abbreviations[key]] = payload.pop(key)
185 discovery_payload: dict[str, Any], component_only: bool =
False
187 """Replace all abbreviations in an MQTT discovery payload."""
191 if CONF_AVAILABILITY
in discovery_payload:
192 for availability_conf
in cv.ensure_list(discovery_payload[CONF_AVAILABILITY]):
198 if CONF_ORIGIN
in discovery_payload:
200 discovery_payload[CONF_ORIGIN],
201 ORIGIN_ABBREVIATIONS,
202 ORIGIN_ABBREVIATIONS_SET,
205 if CONF_DEVICE
in discovery_payload:
207 discovery_payload[CONF_DEVICE],
208 DEVICE_ABBREVIATIONS,
209 DEVICE_ABBREVIATIONS_SET,
212 if CONF_COMPONENTS
in discovery_payload:
213 if not isinstance(discovery_payload[CONF_COMPONENTS], dict):
215 for comp_conf
in discovery_payload[CONF_COMPONENTS].values():
221 """Replace topic base in MQTT discovery data."""
222 base = discovery_payload.pop(TOPIC_BASE)
223 for key, value
in discovery_payload.items():
224 if isinstance(value, str)
and value:
225 if value[0] == TOPIC_BASE
and key.endswith(
"topic"):
226 discovery_payload[key] = f
"{base}{value[1:]}"
227 if value[-1] == TOPIC_BASE
and key.endswith(
"topic"):
228 discovery_payload[key] = f
"{value[:-1]}{base}"
229 if discovery_payload.get(CONF_AVAILABILITY):
230 for availability_conf
in cv.ensure_list(discovery_payload[CONF_AVAILABILITY]):
231 if not isinstance(availability_conf, dict):
233 if topic :=
str(availability_conf.get(CONF_TOPIC)):
234 if topic[0] == TOPIC_BASE:
235 availability_conf[CONF_TOPIC] = f
"{base}{topic[1:]}"
236 if topic[-1] == TOPIC_BASE:
237 availability_conf[CONF_TOPIC] = f
"{topic[:-1]}{base}"
245 migrate_discovery: bool =
False,
246 ) -> MQTTDiscoveryPayload:
247 """Generate a cleanup or discovery migration message on device cleanup.
249 If an empty payload, or a migrate discovery request is received for a device,
250 we forward an empty payload for all previously discovered components.
252 mqtt_data = hass.data[DATA_MQTT]
253 device_node_id: str = f
"{node_id} {object_id}" if node_id
else object_id
255 config.migrate_discovery = migrate_discovery
256 comp_config = config[CONF_COMPONENTS]
257 for platform, discover_id
in mqtt_data.discovery_already_discovered:
258 ids = discover_id.split(
" ")
259 component_node_id = ids.pop(0)
260 component_object_id =
" ".join(ids)
263 if device_node_id == component_node_id:
264 comp_config[component_object_id] = {CONF_PLATFORM: platform}
272 payload: ReceivePayloadType,
275 ) -> MQTTDiscoveryPayload:
276 """Parse a device discovery payload.
278 The device discovery payload is translated info the config payloads for every single
279 component inside the device based configuration.
280 An empty payload is translated in a cleanup, which forwards an empty payload to all
287 "No device components to cleanup for %s, node_id '%s'",
291 return device_payload
295 _LOGGER.warning(
"Unable to parse JSON %s: '%s'", object_id, payload)
296 return device_payload
302 except vol.Invalid
as exc:
304 "Invalid MQTT device discovery payload for %s, %s: '%s'",
310 return device_payload
315 """Parse and validate origin info from a single component discovery payload."""
316 if CONF_ORIGIN
not in discovery_payload:
320 except Exception
as exc:
322 "Unable to parse origin information from discovery message: %s, got %s",
324 discovery_payload[CONF_ORIGIN],
332 component_config: MQTTDiscoveryPayload, device_config: dict[str, Any]
334 """Merge common device options with the component config options.
338 CONF_AVAILABILITY_MODE,
339 CONF_AVAILABILITY_TEMPLATE,
340 CONF_AVAILABILITY_TOPIC,
342 CONF_PAYLOAD_AVAILABLE,
343 CONF_PAYLOAD_NOT_AVAILABLE,
345 Common options in the body of the device based config are inherited into
346 the component. Unless the option is explicitly specified at component level,
347 in that case the option at component level will override the common option.
349 for option
in SHARED_OPTIONS:
350 if option
in device_config
and option
not in component_config:
351 component_config[option] = device_config.get(option)
355 hass: HomeAssistant, discovery_topic: str, config_entry: ConfigEntry
357 """Start MQTT Discovery."""
358 mqtt_data = hass.data[DATA_MQTT]
359 platform_setup_lock: dict[str, asyncio.Lock] = {}
360 integration_discovery_messages: dict[str, MQTTIntegrationDiscoveryConfig] = {}
363 def _async_add_component(discovery_payload: MQTTDiscoveryPayload) ->
None:
364 """Add a component from a discovery message."""
365 discovery_hash = discovery_payload.discovery_data[ATTR_DISCOVERY_HASH]
366 component, discovery_id = discovery_hash
367 message = f
"Found new component: {component} {discovery_id}"
369 mqtt_data.discovery_already_discovered.add(discovery_hash)
371 hass, MQTT_DISCOVERY_NEW.format(component,
"mqtt"), discovery_payload
374 async
def _async_component_setup(
375 component: str, discovery_payload: MQTTDiscoveryPayload
377 """Perform component set up."""
378 async
with platform_setup_lock.setdefault(component, asyncio.Lock()):
379 if component
not in mqtt_data.platforms_loaded:
381 hass, config_entry, {component}
383 _async_add_component(discovery_payload)
386 def async_discovery_message_received(msg: ReceiveMessage) ->
None:
387 """Process the received message."""
388 mqtt_data.last_discovery = msg.timestamp
389 payload = msg.payload
391 topic_trimmed = topic.replace(f
"{discovery_topic}/",
"", 1)
393 if not (match := TOPIC_MATCHER.match(topic_trimmed)):
394 if topic_trimmed.endswith(
"config"):
397 "Received message on illegal discovery topic '%s'. The topic"
398 " contains non allowed characters. For more information see "
399 "https://www.home-assistant.io/integrations/mqtt/#discovery-topic"
405 component, node_id, object_id = match.groups()
407 discovered_components: list[MqttComponentConfig] = []
408 if component == CONF_DEVICE:
416 hass, payload, object_id, node_id
418 if not device_discovery_payload:
420 device_config: dict[str, Any]
421 origin_config: dict[str, Any] |
None
422 component_configs: dict[str, dict[str, Any]]
423 device_config = device_discovery_payload[CONF_DEVICE]
424 origin_config = device_discovery_payload.get(CONF_ORIGIN)
425 component_configs = device_discovery_payload[CONF_COMPONENTS]
426 for component_id, config
in component_configs.items():
427 component = config.pop(CONF_PLATFORM)
430 component_node_id = object_id
434 component_object_id = (
435 f
"{node_id} {component_id}" if node_id
else component_id
442 discovery_payload[CONF_DEVICE] = device_config
443 discovery_payload[CONF_ORIGIN] = origin_config
447 discovery_payload, device_discovery_payload
449 discovery_payload.device_discovery =
True
450 discovery_payload.migrate_discovery = (
451 device_discovery_payload.migrate_discovery
453 discovered_components.append(
462 "Process device discovery payload %s", device_discovery_payload
464 device_discovery_id = f
"{node_id} {object_id}" if node_id
else object_id
465 message = f
"Processing device discovery for '{device_discovery_id}'"
477 _LOGGER.warning(
"Unable to parse JSON %s: '%s'", object_id, payload)
483 discovered_components.append(
487 discovery_pending_discovered = mqtt_data.discovery_pending_discovered
488 for component_config
in discovered_components:
489 component = component_config.component
490 node_id = component_config.node_id
491 object_id = component_config.object_id
492 discovery_payload = component_config.discovery_payload
494 if TOPIC_BASE
in discovery_payload:
498 discovery_id = f
"{node_id} {object_id}" if node_id
else object_id
499 discovery_hash = (component, discovery_id)
502 discovery_payload.discovery_data = {
503 ATTR_DISCOVERY_HASH: discovery_hash,
504 ATTR_DISCOVERY_PAYLOAD: discovery_payload,
505 ATTR_DISCOVERY_TOPIC: topic,
508 if discovery_hash
in discovery_pending_discovered:
509 pending = discovery_pending_discovered[discovery_hash][
"pending"]
510 pending.appendleft(discovery_payload)
512 "Component has already been discovered: %s %s, queuing update",
518 async_process_discovery_payload(component, discovery_id, discovery_payload)
521 def async_process_discovery_payload(
522 component: str, discovery_id: str, payload: MQTTDiscoveryPayload
524 """Process the payload of a new discovery."""
526 _LOGGER.debug(
"Process component discovery payload %s", payload)
527 discovery_hash = (component, discovery_id)
529 already_discovered = discovery_hash
in mqtt_data.discovery_already_discovered
531 already_discovered
or payload
532 )
and discovery_hash
not in mqtt_data.discovery_pending_discovered:
533 discovery_pending_discovered = mqtt_data.discovery_pending_discovered
536 def discovery_done(_: Any) ->
None:
537 pending = discovery_pending_discovered[discovery_hash][
"pending"]
538 _LOGGER.debug(
"Pending discovery for %s: %s", discovery_hash, pending)
540 discovery_pending_discovered[discovery_hash][
"unsub"]()
541 discovery_pending_discovered.pop(discovery_hash)
543 payload = pending.pop()
544 async_process_discovery_payload(component, discovery_id, payload)
546 discovery_pending_discovered[discovery_hash] = {
549 MQTT_DISCOVERY_DONE.format(*discovery_hash),
552 "pending": deque([]),
555 if component
not in mqtt_data.platforms_loaded
and payload:
557 config_entry.async_create_task(
558 hass, _async_component_setup(component, payload)
560 elif already_discovered:
562 message = f
"Component has already been discovered: {component} {discovery_id}, sending update"
565 hass, MQTT_DISCOVERY_UPDATED.format(*discovery_hash), payload
568 _async_add_component(payload)
572 hass, MQTT_DISCOVERY_DONE.format(*discovery_hash),
None
575 mqtt_data.discovery_unsubscribe = [
579 async_discovery_message_received,
581 job_type=HassJobType.Callback,
587 f
"{discovery_topic}/{component}/+/config"
588 for component
in SUPPORTED_COMPONENTS
591 f
"{discovery_topic}/{component}/+/+/config"
592 for component
in SUPPORTED_COMPONENTS
595 f
"{discovery_topic}/device/+/config",
596 f
"{discovery_topic}/device/+/+/config",
601 mqtt_data.last_discovery = time.monotonic()
603 integration_unsubscribe = mqtt_data.integration_unsubscribe
605 async
def _async_handle_config_entry_removed(entry: ConfigEntry) ->
None:
606 """Handle integration config entry changes."""
607 for discovery_key
in entry.discovery_keys[DOMAIN]:
609 discovery_key.version != 1
610 or not isinstance(discovery_key.key, str)
611 or discovery_key.key
not in integration_discovery_messages
614 topic = discovery_key.key
615 discovery_message = integration_discovery_messages[topic]
616 del integration_discovery_messages[topic]
617 _LOGGER.debug(
"Rediscover service on topic %s", topic)
619 await async_integration_message_received(
620 discovery_message.integration, discovery_message.msg
623 mqtt_data.discovery_unsubscribe.append(
627 _async_handle_config_entry_removed,
631 async
def async_integration_message_received(
632 integration: str, msg: ReceiveMessage
634 """Process the received message."""
636 msg.topic
in integration_discovery_messages
637 and integration_discovery_messages[msg.topic].msg.payload == msg.payload
640 "Ignoring already processed discovery message for '%s' on topic %s: %s",
647 assert mqtt_data.data_config_flow_lock
651 async
with mqtt_data.data_config_flow_lock:
657 subscribed_topic=msg.subscribed_topic,
658 timestamp=msg.timestamp,
660 discovery_key = discovery_flow.DiscoveryKey(
661 domain=DOMAIN, key=msg.topic, version=1
663 discovery_flow.async_create_flow(
666 {
"source": SOURCE_MQTT},
668 discovery_key=discovery_key,
672 integration_discovery_messages[msg.topic] = (
675 elif msg.topic
in integration_discovery_messages:
677 del integration_discovery_messages[msg.topic]
679 integration_unsubscribe.update(
684 functools.partial(async_integration_message_received, integration),
686 job_type=HassJobType.Coroutinefunction,
688 for integration, topics
in mqtt_integrations.items()
695 """Stop MQTT Discovery."""
696 mqtt_data = hass.data[DATA_MQTT]
697 for unsub
in mqtt_data.discovery_unsubscribe:
699 mqtt_data.discovery_unsubscribe = []
700 for key, unsub
in list(mqtt_data.integration_unsubscribe.items()):
702 mqtt_data.integration_unsubscribe.pop(key)
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
None async_log_discovery_origin_info(str message, MQTTDiscoveryPayload discovery_payload, int level=logging.INFO)
None _replace_abbreviations(dict[str, Any]|str payload, dict[str, str] abbreviations, set[str] abbreviations_set)
MQTTDiscoveryPayload _parse_device_payload(HomeAssistant hass, ReceivePayloadType payload, str object_id, str|None node_id)
None async_stop(HomeAssistant hass)
None _replace_all_abbreviations(dict[str, Any] discovery_payload, bool component_only=False)
None _replace_topic_base(MQTTDiscoveryPayload discovery_payload)
None clear_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
None async_start(HomeAssistant hass, str discovery_topic, ConfigEntry config_entry)
None set_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
bool _valid_origin_info(MQTTDiscoveryPayload discovery_payload)
str get_origin_log_string(MQTTDiscoveryPayload discovery_payload, *bool include_url)
str|None get_origin_support_url(MQTTDiscoveryPayload discovery_payload)
None _merge_common_device_options(MQTTDiscoveryPayload component_config, dict[str, Any] device_config)
bool _async_process_discovery_migration(MQTTDiscoveryPayload payload)
MQTTDiscoveryPayload _generate_device_config(HomeAssistant hass, str object_id, str|None node_id, bool migrate_discovery=False)
None async_forward_entry_setup_and_setup_discovery(HomeAssistant hass, ConfigEntry config_entry, set[Platform|str] platforms, bool late=False)
SignalType[ConfigEntry] signal_discovered_config_entry_removed(str discovery_domain)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
dict[str, list[str]] async_get_mqtt(HomeAssistant hass)
JsonObjectType json_loads_object(bytes|bytearray|memoryview|str obj)