Home Assistant Unofficial Reference 2024.12.1
discovery.py
Go to the documentation of this file.
1 """Support for MQTT discovery."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import deque
7 from dataclasses import dataclass
8 import functools
9 from itertools import chain
10 import logging
11 import re
12 import time
13 from typing import TYPE_CHECKING, Any
14 
15 import voluptuous as vol
16 
17 from homeassistant.config_entries import (
18  SOURCE_MQTT,
19  ConfigEntry,
20  signal_discovered_config_entry_removed,
21 )
22 from homeassistant.const import CONF_DEVICE, CONF_PLATFORM
23 from homeassistant.core import HassJobType, HomeAssistant, callback
24 from homeassistant.helpers import discovery_flow
27  async_dispatcher_connect,
28  async_dispatcher_send,
29 )
30 from homeassistant.helpers.service_info.mqtt import MqttServiceInfo, ReceivePayloadType
31 from homeassistant.helpers.typing import DiscoveryInfoType
32 from homeassistant.loader import async_get_mqtt
33 from homeassistant.util.json import json_loads_object
34 from homeassistant.util.signal_type import SignalTypeFormat
35 
36 from .abbreviations import ABBREVIATIONS, DEVICE_ABBREVIATIONS, ORIGIN_ABBREVIATIONS
37 from .client import async_subscribe_internal
38 from .const import (
39  ATTR_DISCOVERY_HASH,
40  ATTR_DISCOVERY_PAYLOAD,
41  ATTR_DISCOVERY_TOPIC,
42  CONF_AVAILABILITY,
43  CONF_COMPONENTS,
44  CONF_ORIGIN,
45  CONF_TOPIC,
46  DOMAIN,
47  SUPPORTED_COMPONENTS,
48 )
49 from .models import DATA_MQTT, MqttComponentConfig, MqttOriginInfo, ReceiveMessage
50 from .schemas import DEVICE_DISCOVERY_SCHEMA, MQTT_ORIGIN_INFO_SCHEMA, SHARED_OPTIONS
51 from .util import async_forward_entry_setup_and_setup_discovery
52 
53 ABBREVIATIONS_SET = set(ABBREVIATIONS)
54 DEVICE_ABBREVIATIONS_SET = set(DEVICE_ABBREVIATIONS)
55 ORIGIN_ABBREVIATIONS_SET = set(ORIGIN_ABBREVIATIONS)
56 
57 _LOGGER = logging.getLogger(__name__)
58 
59 TOPIC_MATCHER = re.compile(
60  r"(?P<component>\w+)/(?:(?P<node_id>[a-zA-Z0-9_-]+)/)"
61  r"?(?P<object_id>[a-zA-Z0-9_-]+)/config"
62 )
63 
64 MQTT_DISCOVERY_UPDATED: SignalTypeFormat[MQTTDiscoveryPayload] = SignalTypeFormat(
65  "mqtt_discovery_updated_{}_{}"
66 )
67 MQTT_DISCOVERY_NEW: SignalTypeFormat[MQTTDiscoveryPayload] = SignalTypeFormat(
68  "mqtt_discovery_new_{}_{}"
69 )
70 MQTT_DISCOVERY_DONE: SignalTypeFormat[Any] = SignalTypeFormat(
71  "mqtt_discovery_done_{}_{}"
72 )
73 
74 TOPIC_BASE = "~"
75 
76 CONF_MIGRATE_DISCOVERY = "migrate_discovery"
77 
78 MIGRATE_DISCOVERY_SCHEMA = vol.Schema(
79  {vol.Optional(CONF_MIGRATE_DISCOVERY): True},
80 )
81 
82 
83 class MQTTDiscoveryPayload(dict[str, Any]):
84  """Class to hold and MQTT discovery payload and discovery data."""
85 
86  device_discovery: bool = False
87  migrate_discovery: bool = False
88  discovery_data: DiscoveryInfoType
89 
90 
91 @dataclass(frozen=True)
93  """Class to hold an integration discovery playload."""
94 
95  integration: str
96  msg: ReceiveMessage
97 
98 
99 @callback
100 def _async_process_discovery_migration(payload: MQTTDiscoveryPayload) -> bool:
101  """Process a discovery migration request in the discovery payload."""
102  # Allow abbreviation
103  if migr_discvry := (payload.pop("migr_discvry", None)):
104  payload[CONF_MIGRATE_DISCOVERY] = migr_discvry
105  if CONF_MIGRATE_DISCOVERY in payload:
106  try:
107  MIGRATE_DISCOVERY_SCHEMA(payload)
108  except vol.Invalid as exc:
109  _LOGGER.warning(exc)
110  return False
111  payload.migrate_discovery = True
112  payload.clear()
113  return True
114  return False
115 
116 
117 def clear_discovery_hash(hass: HomeAssistant, discovery_hash: tuple[str, str]) -> None:
118  """Clear entry from already discovered list."""
119  hass.data[DATA_MQTT].discovery_already_discovered.discard(discovery_hash)
120 
121 
122 def set_discovery_hash(hass: HomeAssistant, discovery_hash: tuple[str, str]) -> None:
123  """Add entry to already discovered list."""
124  hass.data[DATA_MQTT].discovery_already_discovered.add(discovery_hash)
125 
126 
127 @callback
129  discovery_payload: MQTTDiscoveryPayload, *, include_url: bool
130 ) -> str:
131  """Get the origin information from a discovery payload for logging."""
132  if CONF_ORIGIN not in discovery_payload:
133  return ""
134  origin_info: MqttOriginInfo = discovery_payload[CONF_ORIGIN]
135  sw_version_log = ""
136  if sw_version := origin_info.get("sw_version"):
137  sw_version_log = f", version: {sw_version}"
138  support_url_log = ""
139  if include_url and (support_url := get_origin_support_url(discovery_payload)):
140  support_url_log = f", support URL: {support_url}"
141  return f" from external application {origin_info["name"]}{sw_version_log}{support_url_log}"
142 
143 
144 @callback
145 def get_origin_support_url(discovery_payload: MQTTDiscoveryPayload) -> str | None:
146  """Get the origin information support URL from a discovery payload."""
147  if CONF_ORIGIN not in discovery_payload:
148  return ""
149  origin_info: MqttOriginInfo = discovery_payload[CONF_ORIGIN]
150  return origin_info.get("support_url")
151 
152 
153 @callback
155  message: str, discovery_payload: MQTTDiscoveryPayload, level: int = logging.INFO
156 ) -> None:
157  """Log information about the discovery and origin."""
158  # We only log origin info once per device discovery
159  if not _LOGGER.isEnabledFor(level):
160  # bail out early if logging is disabled
161  return
162  _LOGGER.log(
163  level,
164  "%s%s",
165  message,
166  get_origin_log_string(discovery_payload, include_url=True),
167  )
168 
169 
170 @callback
172  payload: dict[str, Any] | str,
173  abbreviations: dict[str, str],
174  abbreviations_set: set[str],
175 ) -> None:
176  """Replace abbreviations in an MQTT discovery payload."""
177  if not isinstance(payload, dict):
178  return
179  for key in abbreviations_set.intersection(payload):
180  payload[abbreviations[key]] = payload.pop(key)
181 
182 
183 @callback
185  discovery_payload: dict[str, Any], component_only: bool = False
186 ) -> None:
187  """Replace all abbreviations in an MQTT discovery payload."""
188 
189  _replace_abbreviations(discovery_payload, ABBREVIATIONS, ABBREVIATIONS_SET)
190 
191  if CONF_AVAILABILITY in discovery_payload:
192  for availability_conf in cv.ensure_list(discovery_payload[CONF_AVAILABILITY]):
193  _replace_abbreviations(availability_conf, ABBREVIATIONS, ABBREVIATIONS_SET)
194 
195  if component_only:
196  return
197 
198  if CONF_ORIGIN in discovery_payload:
200  discovery_payload[CONF_ORIGIN],
201  ORIGIN_ABBREVIATIONS,
202  ORIGIN_ABBREVIATIONS_SET,
203  )
204 
205  if CONF_DEVICE in discovery_payload:
207  discovery_payload[CONF_DEVICE],
208  DEVICE_ABBREVIATIONS,
209  DEVICE_ABBREVIATIONS_SET,
210  )
211 
212  if CONF_COMPONENTS in discovery_payload:
213  if not isinstance(discovery_payload[CONF_COMPONENTS], dict):
214  return
215  for comp_conf in discovery_payload[CONF_COMPONENTS].values():
216  _replace_all_abbreviations(comp_conf, component_only=True)
217 
218 
219 @callback
220 def _replace_topic_base(discovery_payload: MQTTDiscoveryPayload) -> None:
221  """Replace topic base in MQTT discovery data."""
222  base = discovery_payload.pop(TOPIC_BASE)
223  for key, value in discovery_payload.items():
224  if isinstance(value, str) and value:
225  if value[0] == TOPIC_BASE and key.endswith("topic"):
226  discovery_payload[key] = f"{base}{value[1:]}"
227  if value[-1] == TOPIC_BASE and key.endswith("topic"):
228  discovery_payload[key] = f"{value[:-1]}{base}"
229  if discovery_payload.get(CONF_AVAILABILITY):
230  for availability_conf in cv.ensure_list(discovery_payload[CONF_AVAILABILITY]):
231  if not isinstance(availability_conf, dict):
232  continue
233  if topic := str(availability_conf.get(CONF_TOPIC)):
234  if topic[0] == TOPIC_BASE:
235  availability_conf[CONF_TOPIC] = f"{base}{topic[1:]}"
236  if topic[-1] == TOPIC_BASE:
237  availability_conf[CONF_TOPIC] = f"{topic[:-1]}{base}"
238 
239 
240 @callback
242  hass: HomeAssistant,
243  object_id: str,
244  node_id: str | None,
245  migrate_discovery: bool = False,
246 ) -> MQTTDiscoveryPayload:
247  """Generate a cleanup or discovery migration message on device cleanup.
248 
249  If an empty payload, or a migrate discovery request is received for a device,
250  we forward an empty payload for all previously discovered components.
251  """
252  mqtt_data = hass.data[DATA_MQTT]
253  device_node_id: str = f"{node_id} {object_id}" if node_id else object_id
254  config = MQTTDiscoveryPayload({CONF_DEVICE: {}, CONF_COMPONENTS: {}})
255  config.migrate_discovery = migrate_discovery
256  comp_config = config[CONF_COMPONENTS]
257  for platform, discover_id in mqtt_data.discovery_already_discovered:
258  ids = discover_id.split(" ")
259  component_node_id = ids.pop(0)
260  component_object_id = " ".join(ids)
261  if not ids:
262  continue
263  if device_node_id == component_node_id:
264  comp_config[component_object_id] = {CONF_PLATFORM: platform}
265 
266  return config if comp_config else MQTTDiscoveryPayload({})
267 
268 
269 @callback
271  hass: HomeAssistant,
272  payload: ReceivePayloadType,
273  object_id: str,
274  node_id: str | None,
275 ) -> MQTTDiscoveryPayload:
276  """Parse a device discovery payload.
277 
278  The device discovery payload is translated info the config payloads for every single
279  component inside the device based configuration.
280  An empty payload is translated in a cleanup, which forwards an empty payload to all
281  removed components.
282  """
283  device_payload = MQTTDiscoveryPayload()
284  if payload == "":
285  if not (device_payload := _generate_device_config(hass, object_id, node_id)):
286  _LOGGER.warning(
287  "No device components to cleanup for %s, node_id '%s'",
288  object_id,
289  node_id,
290  )
291  return device_payload
292  try:
293  device_payload = MQTTDiscoveryPayload(json_loads_object(payload))
294  except ValueError:
295  _LOGGER.warning("Unable to parse JSON %s: '%s'", object_id, payload)
296  return device_payload
297  if _async_process_discovery_migration(device_payload):
298  return _generate_device_config(hass, object_id, node_id, migrate_discovery=True)
299  _replace_all_abbreviations(device_payload)
300  try:
301  DEVICE_DISCOVERY_SCHEMA(device_payload)
302  except vol.Invalid as exc:
303  _LOGGER.warning(
304  "Invalid MQTT device discovery payload for %s, %s: '%s'",
305  object_id,
306  exc,
307  payload,
308  )
309  return MQTTDiscoveryPayload({})
310  return device_payload
311 
312 
313 @callback
314 def _valid_origin_info(discovery_payload: MQTTDiscoveryPayload) -> bool:
315  """Parse and validate origin info from a single component discovery payload."""
316  if CONF_ORIGIN not in discovery_payload:
317  return True
318  try:
319  MQTT_ORIGIN_INFO_SCHEMA(discovery_payload[CONF_ORIGIN])
320  except Exception as exc: # noqa:BLE001
321  _LOGGER.warning(
322  "Unable to parse origin information from discovery message: %s, got %s",
323  exc,
324  discovery_payload[CONF_ORIGIN],
325  )
326  return False
327  return True
328 
329 
330 @callback
332  component_config: MQTTDiscoveryPayload, device_config: dict[str, Any]
333 ) -> None:
334  """Merge common device options with the component config options.
335 
336  Common options are:
337  CONF_AVAILABILITY,
338  CONF_AVAILABILITY_MODE,
339  CONF_AVAILABILITY_TEMPLATE,
340  CONF_AVAILABILITY_TOPIC,
341  CONF_COMMAND_TOPIC,
342  CONF_PAYLOAD_AVAILABLE,
343  CONF_PAYLOAD_NOT_AVAILABLE,
344  CONF_STATE_TOPIC,
345  Common options in the body of the device based config are inherited into
346  the component. Unless the option is explicitly specified at component level,
347  in that case the option at component level will override the common option.
348  """
349  for option in SHARED_OPTIONS:
350  if option in device_config and option not in component_config:
351  component_config[option] = device_config.get(option)
352 
353 
354 async def async_start( # noqa: C901
355  hass: HomeAssistant, discovery_topic: str, config_entry: ConfigEntry
356 ) -> None:
357  """Start MQTT Discovery."""
358  mqtt_data = hass.data[DATA_MQTT]
359  platform_setup_lock: dict[str, asyncio.Lock] = {}
360  integration_discovery_messages: dict[str, MQTTIntegrationDiscoveryConfig] = {}
361 
362  @callback
363  def _async_add_component(discovery_payload: MQTTDiscoveryPayload) -> None:
364  """Add a component from a discovery message."""
365  discovery_hash = discovery_payload.discovery_data[ATTR_DISCOVERY_HASH]
366  component, discovery_id = discovery_hash
367  message = f"Found new component: {component} {discovery_id}"
368  async_log_discovery_origin_info(message, discovery_payload)
369  mqtt_data.discovery_already_discovered.add(discovery_hash)
371  hass, MQTT_DISCOVERY_NEW.format(component, "mqtt"), discovery_payload
372  )
373 
374  async def _async_component_setup(
375  component: str, discovery_payload: MQTTDiscoveryPayload
376  ) -> None:
377  """Perform component set up."""
378  async with platform_setup_lock.setdefault(component, asyncio.Lock()):
379  if component not in mqtt_data.platforms_loaded:
381  hass, config_entry, {component}
382  )
383  _async_add_component(discovery_payload)
384 
385  @callback
386  def async_discovery_message_received(msg: ReceiveMessage) -> None: # noqa: C901
387  """Process the received message."""
388  mqtt_data.last_discovery = msg.timestamp
389  payload = msg.payload
390  topic = msg.topic
391  topic_trimmed = topic.replace(f"{discovery_topic}/", "", 1)
392 
393  if not (match := TOPIC_MATCHER.match(topic_trimmed)):
394  if topic_trimmed.endswith("config"):
395  _LOGGER.warning(
396  (
397  "Received message on illegal discovery topic '%s'. The topic"
398  " contains non allowed characters. For more information see "
399  "https://www.home-assistant.io/integrations/mqtt/#discovery-topic"
400  ),
401  topic,
402  )
403  return
404 
405  component, node_id, object_id = match.groups()
406 
407  discovered_components: list[MqttComponentConfig] = []
408  if component == CONF_DEVICE:
409  # Process device based discovery message and regenerate
410  # cleanup config for the all the components that are being removed.
411  # This is done when a component in the device config is omitted and detected
412  # as being removed, or when the device config update payload is empty.
413  # In that case this will regenerate a cleanup message for all every already
414  # discovered components that were linked to the initial device discovery.
415  device_discovery_payload = _parse_device_payload(
416  hass, payload, object_id, node_id
417  )
418  if not device_discovery_payload:
419  return
420  device_config: dict[str, Any]
421  origin_config: dict[str, Any] | None
422  component_configs: dict[str, dict[str, Any]]
423  device_config = device_discovery_payload[CONF_DEVICE]
424  origin_config = device_discovery_payload.get(CONF_ORIGIN)
425  component_configs = device_discovery_payload[CONF_COMPONENTS]
426  for component_id, config in component_configs.items():
427  component = config.pop(CONF_PLATFORM)
428  # The object_id in the device discovery topic is the unique identifier.
429  # It is used as node_id for the components it contains.
430  component_node_id = object_id
431  # The component_id in the discovery playload is used as object_id
432  # If we have an additional node_id in the discovery topic,
433  # we extend the component_id with it.
434  component_object_id = (
435  f"{node_id} {component_id}" if node_id else component_id
436  )
437  # We add wrapper to the discovery payload with the discovery data.
438  # If the dict is empty after removing the platform, the payload is
439  # assumed to remove the existing config and we do not want to add
440  # device or orig or shared availability attributes.
441  if discovery_payload := MQTTDiscoveryPayload(config):
442  discovery_payload[CONF_DEVICE] = device_config
443  discovery_payload[CONF_ORIGIN] = origin_config
444  # Only assign shared config options
445  # when they are not set at entity level
447  discovery_payload, device_discovery_payload
448  )
449  discovery_payload.device_discovery = True
450  discovery_payload.migrate_discovery = (
451  device_discovery_payload.migrate_discovery
452  )
453  discovered_components.append(
455  component,
456  component_object_id,
457  component_node_id,
458  discovery_payload,
459  )
460  )
461  _LOGGER.debug(
462  "Process device discovery payload %s", device_discovery_payload
463  )
464  device_discovery_id = f"{node_id} {object_id}" if node_id else object_id
465  message = f"Processing device discovery for '{device_discovery_id}'"
467  message, MQTTDiscoveryPayload(device_discovery_payload)
468  )
469 
470  else:
471  # Process component based discovery message
472  try:
473  discovery_payload = MQTTDiscoveryPayload(
474  json_loads_object(payload) if payload else {}
475  )
476  except ValueError:
477  _LOGGER.warning("Unable to parse JSON %s: '%s'", object_id, payload)
478  return
479  if not _async_process_discovery_migration(discovery_payload):
480  _replace_all_abbreviations(discovery_payload)
481  if not _valid_origin_info(discovery_payload):
482  return
483  discovered_components.append(
484  MqttComponentConfig(component, object_id, node_id, discovery_payload)
485  )
486 
487  discovery_pending_discovered = mqtt_data.discovery_pending_discovered
488  for component_config in discovered_components:
489  component = component_config.component
490  node_id = component_config.node_id
491  object_id = component_config.object_id
492  discovery_payload = component_config.discovery_payload
493 
494  if TOPIC_BASE in discovery_payload:
495  _replace_topic_base(discovery_payload)
496 
497  # If present, the node_id will be included in the discovery_id.
498  discovery_id = f"{node_id} {object_id}" if node_id else object_id
499  discovery_hash = (component, discovery_id)
500 
501  # Attach MQTT topic to the payload, used for debug prints
502  discovery_payload.discovery_data = {
503  ATTR_DISCOVERY_HASH: discovery_hash,
504  ATTR_DISCOVERY_PAYLOAD: discovery_payload,
505  ATTR_DISCOVERY_TOPIC: topic,
506  }
507 
508  if discovery_hash in discovery_pending_discovered:
509  pending = discovery_pending_discovered[discovery_hash]["pending"]
510  pending.appendleft(discovery_payload)
511  _LOGGER.debug(
512  "Component has already been discovered: %s %s, queuing update",
513  component,
514  discovery_id,
515  )
516  return
517 
518  async_process_discovery_payload(component, discovery_id, discovery_payload)
519 
520  @callback
521  def async_process_discovery_payload(
522  component: str, discovery_id: str, payload: MQTTDiscoveryPayload
523  ) -> None:
524  """Process the payload of a new discovery."""
525 
526  _LOGGER.debug("Process component discovery payload %s", payload)
527  discovery_hash = (component, discovery_id)
528 
529  already_discovered = discovery_hash in mqtt_data.discovery_already_discovered
530  if (
531  already_discovered or payload
532  ) and discovery_hash not in mqtt_data.discovery_pending_discovered:
533  discovery_pending_discovered = mqtt_data.discovery_pending_discovered
534 
535  @callback
536  def discovery_done(_: Any) -> None:
537  pending = discovery_pending_discovered[discovery_hash]["pending"]
538  _LOGGER.debug("Pending discovery for %s: %s", discovery_hash, pending)
539  if not pending:
540  discovery_pending_discovered[discovery_hash]["unsub"]()
541  discovery_pending_discovered.pop(discovery_hash)
542  else:
543  payload = pending.pop()
544  async_process_discovery_payload(component, discovery_id, payload)
545 
546  discovery_pending_discovered[discovery_hash] = {
547  "unsub": async_dispatcher_connect(
548  hass,
549  MQTT_DISCOVERY_DONE.format(*discovery_hash),
550  discovery_done,
551  ),
552  "pending": deque([]),
553  }
554 
555  if component not in mqtt_data.platforms_loaded and payload:
556  # Load component first
557  config_entry.async_create_task(
558  hass, _async_component_setup(component, payload)
559  )
560  elif already_discovered:
561  # Dispatch update
562  message = f"Component has already been discovered: {component} {discovery_id}, sending update"
563  async_log_discovery_origin_info(message, payload, logging.DEBUG)
565  hass, MQTT_DISCOVERY_UPDATED.format(*discovery_hash), payload
566  )
567  elif payload:
568  _async_add_component(payload)
569  else:
570  # Unhandled discovery message
572  hass, MQTT_DISCOVERY_DONE.format(*discovery_hash), None
573  )
574 
575  mqtt_data.discovery_unsubscribe = [
577  hass,
578  topic,
579  async_discovery_message_received,
580  0,
581  job_type=HassJobType.Callback,
582  )
583  # Subscribe first for platform discovery wildcard topics first,
584  # and then subscribe device discovery wildcard topics.
585  for topic in chain(
586  (
587  f"{discovery_topic}/{component}/+/config"
588  for component in SUPPORTED_COMPONENTS
589  ),
590  (
591  f"{discovery_topic}/{component}/+/+/config"
592  for component in SUPPORTED_COMPONENTS
593  ),
594  (
595  f"{discovery_topic}/device/+/config",
596  f"{discovery_topic}/device/+/+/config",
597  ),
598  )
599  ]
600 
601  mqtt_data.last_discovery = time.monotonic()
602  mqtt_integrations = await async_get_mqtt(hass)
603  integration_unsubscribe = mqtt_data.integration_unsubscribe
604 
605  async def _async_handle_config_entry_removed(entry: ConfigEntry) -> None:
606  """Handle integration config entry changes."""
607  for discovery_key in entry.discovery_keys[DOMAIN]:
608  if (
609  discovery_key.version != 1
610  or not isinstance(discovery_key.key, str)
611  or discovery_key.key not in integration_discovery_messages
612  ):
613  continue
614  topic = discovery_key.key
615  discovery_message = integration_discovery_messages[topic]
616  del integration_discovery_messages[topic]
617  _LOGGER.debug("Rediscover service on topic %s", topic)
618  # Initiate re-discovery
619  await async_integration_message_received(
620  discovery_message.integration, discovery_message.msg
621  )
622 
623  mqtt_data.discovery_unsubscribe.append(
625  hass,
627  _async_handle_config_entry_removed,
628  )
629  )
630 
631  async def async_integration_message_received(
632  integration: str, msg: ReceiveMessage
633  ) -> None:
634  """Process the received message."""
635  if (
636  msg.topic in integration_discovery_messages
637  and integration_discovery_messages[msg.topic].msg.payload == msg.payload
638  ):
639  _LOGGER.debug(
640  "Ignoring already processed discovery message for '%s' on topic %s: %s",
641  integration,
642  msg.topic,
643  msg.payload,
644  )
645  return
646  if TYPE_CHECKING:
647  assert mqtt_data.data_config_flow_lock
648 
649  # Lock to prevent initiating many parallel config flows.
650  # Note: The lock is not intended to prevent a race, only for performance
651  async with mqtt_data.data_config_flow_lock:
652  data = MqttServiceInfo(
653  topic=msg.topic,
654  payload=msg.payload,
655  qos=msg.qos,
656  retain=msg.retain,
657  subscribed_topic=msg.subscribed_topic,
658  timestamp=msg.timestamp,
659  )
660  discovery_key = discovery_flow.DiscoveryKey(
661  domain=DOMAIN, key=msg.topic, version=1
662  )
663  discovery_flow.async_create_flow(
664  hass,
665  integration,
666  {"source": SOURCE_MQTT},
667  data,
668  discovery_key=discovery_key,
669  )
670  if msg.payload:
671  # Update the last discovered config message
672  integration_discovery_messages[msg.topic] = (
673  MQTTIntegrationDiscoveryConfig(integration=integration, msg=msg)
674  )
675  elif msg.topic in integration_discovery_messages:
676  # Cleanup cache if discovery payload is empty
677  del integration_discovery_messages[msg.topic]
678 
679  integration_unsubscribe.update(
680  {
681  f"{integration}_{topic}": async_subscribe_internal(
682  hass,
683  topic,
684  functools.partial(async_integration_message_received, integration),
685  0,
686  job_type=HassJobType.Coroutinefunction,
687  )
688  for integration, topics in mqtt_integrations.items()
689  for topic in topics
690  }
691  )
692 
693 
694 async def async_stop(hass: HomeAssistant) -> None:
695  """Stop MQTT Discovery."""
696  mqtt_data = hass.data[DATA_MQTT]
697  for unsub in mqtt_data.discovery_unsubscribe:
698  unsub()
699  mqtt_data.discovery_unsubscribe = []
700  for key, unsub in list(mqtt_data.integration_unsubscribe.items()):
701  unsub()
702  mqtt_data.integration_unsubscribe.pop(key)
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
Definition: client.py:210
None async_log_discovery_origin_info(str message, MQTTDiscoveryPayload discovery_payload, int level=logging.INFO)
Definition: discovery.py:156
None _replace_abbreviations(dict[str, Any]|str payload, dict[str, str] abbreviations, set[str] abbreviations_set)
Definition: discovery.py:175
MQTTDiscoveryPayload _parse_device_payload(HomeAssistant hass, ReceivePayloadType payload, str object_id, str|None node_id)
Definition: discovery.py:275
None async_stop(HomeAssistant hass)
Definition: discovery.py:694
None _replace_all_abbreviations(dict[str, Any] discovery_payload, bool component_only=False)
Definition: discovery.py:186
None _replace_topic_base(MQTTDiscoveryPayload discovery_payload)
Definition: discovery.py:220
None clear_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
Definition: discovery.py:117
None async_start(HomeAssistant hass, str discovery_topic, ConfigEntry config_entry)
Definition: discovery.py:356
None set_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
Definition: discovery.py:122
bool _valid_origin_info(MQTTDiscoveryPayload discovery_payload)
Definition: discovery.py:314
str get_origin_log_string(MQTTDiscoveryPayload discovery_payload, *bool include_url)
Definition: discovery.py:130
str|None get_origin_support_url(MQTTDiscoveryPayload discovery_payload)
Definition: discovery.py:145
None _merge_common_device_options(MQTTDiscoveryPayload component_config, dict[str, Any] device_config)
Definition: discovery.py:333
bool _async_process_discovery_migration(MQTTDiscoveryPayload payload)
Definition: discovery.py:100
MQTTDiscoveryPayload _generate_device_config(HomeAssistant hass, str object_id, str|None node_id, bool migrate_discovery=False)
Definition: discovery.py:246
None async_forward_entry_setup_and_setup_discovery(HomeAssistant hass, ConfigEntry config_entry, set[Platform|str] platforms, bool late=False)
Definition: util.py:158
SignalType[ConfigEntry] signal_discovered_config_entry_removed(str discovery_domain)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
dict[str, list[str]] async_get_mqtt(HomeAssistant hass)
Definition: loader.py:624
JsonObjectType json_loads_object(bytes|bytearray|memoryview|str obj)
Definition: json.py:54