Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """MQTT (entity) component mixins and helpers."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 from collections.abc import Callable, Coroutine
7 from functools import partial
8 import logging
9 from typing import TYPE_CHECKING, Any, Protocol, cast, final
10 
11 import voluptuous as vol
12 
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.const import (
15  ATTR_CONFIGURATION_URL,
16  ATTR_HW_VERSION,
17  ATTR_MANUFACTURER,
18  ATTR_MODEL,
19  ATTR_MODEL_ID,
20  ATTR_NAME,
21  ATTR_SERIAL_NUMBER,
22  ATTR_SUGGESTED_AREA,
23  ATTR_SW_VERSION,
24  ATTR_VIA_DEVICE,
25  CONF_DEVICE,
26  CONF_ENTITY_CATEGORY,
27  CONF_ICON,
28  CONF_MODEL,
29  CONF_MODEL_ID,
30  CONF_NAME,
31  CONF_UNIQUE_ID,
32  CONF_VALUE_TEMPLATE,
33 )
34 from homeassistant.core import Event, HassJobType, HomeAssistant, callback
35 from homeassistant.helpers import device_registry as dr, entity_registry as er
37  DeviceEntry,
38  DeviceInfo,
39  EventDeviceRegistryUpdatedData,
40 )
42  async_dispatcher_connect,
43  async_dispatcher_send,
44 )
45 from homeassistant.helpers.entity import Entity, async_generate_entity_id
46 from homeassistant.helpers.entity_platform import AddEntitiesCallback
47 from homeassistant.helpers.event import (
48  async_track_device_registry_updated_event,
49  async_track_entity_registry_updated_event,
50 )
51 from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
52 from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
53 from homeassistant.helpers.typing import (
54  UNDEFINED,
55  ConfigType,
56  DiscoveryInfoType,
57  UndefinedType,
58  VolSchemaType,
59 )
60 from homeassistant.util.json import json_loads
61 from homeassistant.util.yaml import dump as yaml_dump
62 
63 from . import debug_info, subscription
64 from .client import async_publish
65 from .const import (
66  ATTR_DISCOVERY_HASH,
67  ATTR_DISCOVERY_PAYLOAD,
68  ATTR_DISCOVERY_TOPIC,
69  AVAILABILITY_ALL,
70  AVAILABILITY_ANY,
71  CONF_AVAILABILITY,
72  CONF_AVAILABILITY_MODE,
73  CONF_AVAILABILITY_TEMPLATE,
74  CONF_AVAILABILITY_TOPIC,
75  CONF_CONFIGURATION_URL,
76  CONF_CONNECTIONS,
77  CONF_ENABLED_BY_DEFAULT,
78  CONF_ENCODING,
79  CONF_ENTITY_PICTURE,
80  CONF_HW_VERSION,
81  CONF_IDENTIFIERS,
82  CONF_JSON_ATTRS_TEMPLATE,
83  CONF_JSON_ATTRS_TOPIC,
84  CONF_MANUFACTURER,
85  CONF_OBJECT_ID,
86  CONF_PAYLOAD_AVAILABLE,
87  CONF_PAYLOAD_NOT_AVAILABLE,
88  CONF_QOS,
89  CONF_RETAIN,
90  CONF_SCHEMA,
91  CONF_SERIAL_NUMBER,
92  CONF_SUGGESTED_AREA,
93  CONF_SW_VERSION,
94  CONF_TOPIC,
95  CONF_VIA_DEVICE,
96  DEFAULT_ENCODING,
97  DOMAIN,
98  MQTT_CONNECTION_STATE,
99 )
100 from .debug_info import log_message
101 from .discovery import (
102  MQTT_DISCOVERY_DONE,
103  MQTT_DISCOVERY_NEW,
104  MQTT_DISCOVERY_UPDATED,
105  MQTTDiscoveryPayload,
106  clear_discovery_hash,
107  get_origin_log_string,
108  get_origin_support_url,
109  set_discovery_hash,
110 )
111 from .models import (
112  DATA_MQTT,
113  MessageCallbackType,
114  MqttValueTemplate,
115  MqttValueTemplateException,
116  PublishPayloadType,
117  ReceiveMessage,
118 )
119 from .subscription import (
120  EntitySubscription,
121  async_prepare_subscribe_topics,
122  async_subscribe_topics_internal,
123  async_unsubscribe_topics,
124 )
125 from .util import mqtt_config_entry_enabled
126 
127 _LOGGER = logging.getLogger(__name__)
128 
129 MQTT_ATTRIBUTES_BLOCKED = {
130  "assumed_state",
131  "available",
132  "device_class",
133  "device_info",
134  "entity_category",
135  "entity_picture",
136  "entity_registry_enabled_default",
137  "extra_state_attributes",
138  "force_update",
139  "icon",
140  "name",
141  "should_poll",
142  "state",
143  "supported_features",
144  "unique_id",
145  "unit_of_measurement",
146 }
147 
148 
149 @callback
151  discovery_payload: MQTTDiscoveryPayload, err: vol.Invalid
152 ) -> None:
153  """Help handling schema errors on MQTT discovery messages."""
154  discovery_topic: str = discovery_payload.discovery_data[ATTR_DISCOVERY_TOPIC]
155  _LOGGER.error(
156  "Error '%s' when processing MQTT discovery message topic: '%s', message: '%s'",
157  err,
158  discovery_topic,
159  discovery_payload,
160  )
161 
162 
164  hass: HomeAssistant,
165  discovery_payload: MQTTDiscoveryPayload,
166 ) -> None:
167  """Handle discovery failure."""
168  discovery_hash = discovery_payload.discovery_data[ATTR_DISCOVERY_HASH]
169  clear_discovery_hash(hass, discovery_hash)
170  async_dispatcher_send(hass, MQTT_DISCOVERY_DONE.format(*discovery_hash), None)
171 
172 
174  hass: HomeAssistant, domain: str, discovery_payload: MQTTDiscoveryPayload
175 ) -> bool:
176  """Verify MQTT config entry is enabled or log warning."""
177  if not mqtt_config_entry_enabled(hass):
178  _LOGGER.warning(
179  (
180  "MQTT integration is disabled, skipping setup of discovered item "
181  "MQTT %s, payload %s"
182  ),
183  domain,
184  discovery_payload,
185  )
186  return False
187  return True
188 
189 
190 class _SetupNonEntityHelperCallbackProtocol(Protocol): # pragma: no cover
191  """Callback protocol for async_setup in async_setup_non_entity_entry_helper."""
192 
193  async def __call__(
194  self, config: ConfigType, discovery_data: DiscoveryInfoType
195  ) -> None: ...
196 
197 
198 @callback
200  hass: HomeAssistant,
201  domain: str,
202  async_setup: _SetupNonEntityHelperCallbackProtocol,
203  discovery_schema: vol.Schema,
204 ) -> None:
205  """Set up automation or tag creation dynamically through MQTT discovery."""
206  mqtt_data = hass.data[DATA_MQTT]
207 
208  async def _async_setup_non_entity_entry_from_discovery(
209  discovery_payload: MQTTDiscoveryPayload,
210  ) -> None:
211  """Set up an MQTT entity, automation or tag from discovery."""
213  hass, domain, discovery_payload
214  ):
215  return
216  try:
217  config: ConfigType = discovery_schema(discovery_payload)
218  await async_setup(config, discovery_data=discovery_payload.discovery_data)
219  except vol.Invalid as err:
220  _handle_discovery_failure(hass, discovery_payload)
221  async_handle_schema_error(discovery_payload, err)
222  except Exception:
223  _handle_discovery_failure(hass, discovery_payload)
224  raise
225 
226  mqtt_data.reload_dispatchers.append(
228  hass,
229  MQTT_DISCOVERY_NEW.format(domain, "mqtt"),
230  _async_setup_non_entity_entry_from_discovery,
231  )
232  )
233 
234 
235 @callback
237  hass: HomeAssistant,
238  entry: ConfigEntry,
239  entity_class: type[MqttEntity] | None,
240  domain: str,
241  async_add_entities: AddEntitiesCallback,
242  discovery_schema: VolSchemaType,
243  platform_schema_modern: VolSchemaType,
244  schema_class_mapping: dict[str, type[MqttEntity]] | None = None,
245 ) -> None:
246  """Set up entity creation dynamically through MQTT discovery."""
247  mqtt_data = hass.data[DATA_MQTT]
248 
249  @callback
250  def _async_setup_entity_entry_from_discovery(
251  discovery_payload: MQTTDiscoveryPayload,
252  ) -> None:
253  """Set up an MQTT entity from discovery."""
254  nonlocal entity_class
256  hass, domain, discovery_payload
257  ):
258  return
259  try:
260  config: DiscoveryInfoType = discovery_schema(discovery_payload)
261  if schema_class_mapping is not None:
262  entity_class = schema_class_mapping[config[CONF_SCHEMA]]
263  if TYPE_CHECKING:
264  assert entity_class is not None
266  [entity_class(hass, config, entry, discovery_payload.discovery_data)]
267  )
268  except vol.Invalid as err:
269  _handle_discovery_failure(hass, discovery_payload)
270  async_handle_schema_error(discovery_payload, err)
271  except Exception:
272  _handle_discovery_failure(hass, discovery_payload)
273  raise
274 
275  mqtt_data.reload_dispatchers.append(
277  hass,
278  MQTT_DISCOVERY_NEW.format(domain, "mqtt"),
279  _async_setup_entity_entry_from_discovery,
280  )
281  )
282 
283  @callback
284  def _async_setup_entities() -> None:
285  """Set up MQTT items from configuration.yaml."""
286  nonlocal entity_class
287  mqtt_data = hass.data[DATA_MQTT]
288  if not (config_yaml := mqtt_data.config):
289  return
290  yaml_configs: list[ConfigType] = [
291  config
292  for config_item in config_yaml
293  for config_domain, configs in config_item.items()
294  for config in configs
295  if config_domain == domain
296  ]
297  entities: list[Entity] = []
298  for yaml_config in yaml_configs:
299  try:
300  config = platform_schema_modern(yaml_config)
301  if schema_class_mapping is not None:
302  entity_class = schema_class_mapping[config[CONF_SCHEMA]]
303  if TYPE_CHECKING:
304  assert entity_class is not None
305  entities.append(entity_class(hass, config, entry, None))
306  except vol.Invalid as exc:
307  error = str(exc)
308  config_file = getattr(yaml_config, "__config_file__", "?")
309  line = getattr(yaml_config, "__line__", "?")
310  issue_id = hex(hash(frozenset(yaml_config)))
311  yaml_config_str = yaml_dump(yaml_config)
312  learn_more_url = (
313  f"https://www.home-assistant.io/integrations/{domain}.mqtt/"
314  )
316  hass,
317  DOMAIN,
318  issue_id,
319  issue_domain=domain,
320  is_fixable=False,
321  severity=IssueSeverity.ERROR,
322  learn_more_url=learn_more_url,
323  translation_placeholders={
324  "domain": domain,
325  "config_file": config_file,
326  "line": line,
327  "config": yaml_config_str,
328  "error": error,
329  },
330  translation_key="invalid_platform_config",
331  )
332  _LOGGER.error(
333  "%s for manually configured MQTT %s item, in %s, line %s Got %s",
334  error,
335  domain,
336  config_file,
337  line,
338  yaml_config,
339  )
340 
341  async_add_entities(entities)
342 
343  # When reloading we check manual configured items against the schema
344  # before reloading
345  mqtt_data.reload_schema[domain] = platform_schema_modern
346  # discover manual configured MQTT items
347  mqtt_data.reload_handlers[domain] = _async_setup_entities
348  _async_setup_entities()
349 
350 
352  hass: HomeAssistant, entity: Entity, config: ConfigType, entity_id_format: str
353 ) -> None:
354  """Set entity_id from object_id if defined in config."""
355  if CONF_OBJECT_ID in config:
356  entity.entity_id = async_generate_entity_id(
357  entity_id_format, config[CONF_OBJECT_ID], None, hass
358  )
359 
360 
362  """Mixin used for platforms that support JSON attributes."""
363 
364  _attributes_extra_blocked: frozenset[str] = frozenset()
365  _attr_tpl: Callable[[ReceivePayloadType], ReceivePayloadType] | None = None
366 
367  def __init__(self, config: ConfigType) -> None:
368  """Initialize the JSON attributes mixin."""
369  self._attributes_sub_state_attributes_sub_state: dict[str, EntitySubscription] = {}
370  self._attributes_config_attributes_config = config
371 
372  async def async_added_to_hass(self) -> None:
373  """Subscribe MQTT events."""
374  await super().async_added_to_hass()
375  self._attributes_prepare_subscribe_topics_attributes_prepare_subscribe_topics()
376  self._attributes_subscribe_topics_attributes_subscribe_topics()
377 
378  def attributes_prepare_discovery_update(self, config: DiscoveryInfoType) -> None:
379  """Handle updated discovery message."""
380  self._attributes_config_attributes_config = config
381  self._attributes_prepare_subscribe_topics_attributes_prepare_subscribe_topics()
382 
383  async def attributes_discovery_update(self, config: DiscoveryInfoType) -> None:
384  """Handle updated discovery message."""
385  self._attributes_subscribe_topics_attributes_subscribe_topics()
386 
388  """(Re)Subscribe to topics."""
389  if template := self._attributes_config_attributes_config.get(CONF_JSON_ATTRS_TEMPLATE):
390  self._attr_tpl_attr_tpl = MqttValueTemplate(
391  template, entity=self
392  ).async_render_with_possible_json_value
394  self.hasshass,
395  self._attributes_sub_state_attributes_sub_state,
396  {
397  CONF_JSON_ATTRS_TOPIC: {
398  "topic": self._attributes_config_attributes_config.get(CONF_JSON_ATTRS_TOPIC),
399  "msg_callback": partial(
400  self._message_callback, # type: ignore[attr-defined]
401  self._attributes_message_received_attributes_message_received,
402  {"_attr_extra_state_attributes"},
403  ),
404  "entity_id": self.entity_identity_id,
405  "qos": self._attributes_config_attributes_config.get(CONF_QOS),
406  "encoding": self._attributes_config_attributes_config[CONF_ENCODING] or None,
407  "job_type": HassJobType.Callback,
408  }
409  },
410  )
411 
412  @callback
413  def _attributes_subscribe_topics(self) -> None:
414  """(Re)Subscribe to topics."""
415  async_subscribe_topics_internal(self.hasshass, self._attributes_sub_state_attributes_sub_state)
416 
417  async def async_will_remove_from_hass(self) -> None:
418  """Unsubscribe when removed."""
419  self._attributes_sub_state_attributes_sub_state = async_unsubscribe_topics(
420  self.hasshass, self._attributes_sub_state_attributes_sub_state
421  )
422 
423  @callback
424  def _attributes_message_received(self, msg: ReceiveMessage) -> None:
425  """Update extra state attributes."""
426  payload = (
427  self._attr_tpl_attr_tpl(msg.payload) if self._attr_tpl_attr_tpl is not None else msg.payload
428  )
429  try:
430  json_dict = json_loads(payload) if isinstance(payload, str) else None
431  except ValueError:
432  _LOGGER.warning("Erroneous JSON: %s", payload)
433  else:
434  if isinstance(json_dict, dict):
435  filtered_dict = {
436  k: v
437  for k, v in json_dict.items()
438  if k not in MQTT_ATTRIBUTES_BLOCKED
439  and k not in self._attributes_extra_blocked
440  }
441  self._attr_extra_state_attributes_attr_extra_state_attributes = filtered_dict
442  else:
443  _LOGGER.warning("JSON result was not a dictionary")
444 
445 
447  """Mixin used for platforms that report availability."""
448 
449  def __init__(self, config: ConfigType) -> None:
450  """Initialize the availability mixin."""
451  self._availability_sub_state_availability_sub_state: dict[str, EntitySubscription] = {}
452  self._available_available: dict[str, str | bool] = {}
453  self._available_latest_available_latest: bool = False
454  self._availability_setup_from_config_availability_setup_from_config(config)
455 
456  async def async_added_to_hass(self) -> None:
457  """Subscribe MQTT events."""
458  await super().async_added_to_hass()
459  self._availability_prepare_subscribe_topics_availability_prepare_subscribe_topics()
460  self._availability_subscribe_topics_availability_subscribe_topics()
461  self.async_on_removeasync_on_remove(
463  self.hasshass,
464  MQTT_CONNECTION_STATE,
465  self.async_mqtt_connection_state_changedasync_mqtt_connection_state_changed,
466  )
467  )
468 
469  def availability_prepare_discovery_update(self, config: DiscoveryInfoType) -> None:
470  """Handle updated discovery message."""
471  self._availability_setup_from_config_availability_setup_from_config(config)
472  self._availability_prepare_subscribe_topics_availability_prepare_subscribe_topics()
473 
474  async def availability_discovery_update(self, config: DiscoveryInfoType) -> None:
475  """Handle updated discovery message."""
476  self._availability_subscribe_topics_availability_subscribe_topics()
477 
478  def _availability_setup_from_config(self, config: ConfigType) -> None:
479  """(Re)Setup."""
480  self._avail_topics: dict[str, dict[str, Any]] = {}
481  if CONF_AVAILABILITY_TOPIC in config:
482  self._avail_topics[config[CONF_AVAILABILITY_TOPIC]] = {
483  CONF_PAYLOAD_AVAILABLE: config[CONF_PAYLOAD_AVAILABLE],
484  CONF_PAYLOAD_NOT_AVAILABLE: config[CONF_PAYLOAD_NOT_AVAILABLE],
485  CONF_AVAILABILITY_TEMPLATE: config.get(CONF_AVAILABILITY_TEMPLATE),
486  }
487 
488  if CONF_AVAILABILITY in config:
489  avail: dict[str, Any]
490  for avail in config[CONF_AVAILABILITY]:
491  self._avail_topics[avail[CONF_TOPIC]] = {
492  CONF_PAYLOAD_AVAILABLE: avail[CONF_PAYLOAD_AVAILABLE],
493  CONF_PAYLOAD_NOT_AVAILABLE: avail[CONF_PAYLOAD_NOT_AVAILABLE],
494  CONF_AVAILABILITY_TEMPLATE: avail.get(CONF_VALUE_TEMPLATE),
495  }
496 
497  for avail_topic_conf in self._avail_topics.values():
498  if template := avail_topic_conf[CONF_AVAILABILITY_TEMPLATE]:
499  avail_topic_conf[CONF_AVAILABILITY_TEMPLATE] = MqttValueTemplate(
500  template, entity=self
501  ).async_render_with_possible_json_value
502 
503  self._avail_config_avail_config = config
504 
506  """(Re)Subscribe to topics."""
507  self._available_available = {
508  topic: (self._available_available.get(topic, False)) for topic in self._avail_topics
509  }
510  topics: dict[str, dict[str, Any]] = {
511  f"availability_{topic}": {
512  "topic": topic,
513  "msg_callback": partial(
514  self._message_callback, # type: ignore[attr-defined]
515  self._availability_message_received_availability_message_received,
516  {"available"},
517  ),
518  "entity_id": self.entity_identity_id,
519  "qos": self._avail_config_avail_config[CONF_QOS],
520  "encoding": self._avail_config_avail_config[CONF_ENCODING] or None,
521  "job_type": HassJobType.Callback,
522  }
523  for topic in self._avail_topics
524  }
525 
527  self.hasshass,
528  self._availability_sub_state_availability_sub_state,
529  topics,
530  )
531 
532  @callback
533  def _availability_message_received(self, msg: ReceiveMessage) -> None:
534  """Handle a new received MQTT availability message."""
535  topic = msg.topic
536  avail_topic = self._avail_topics[topic]
537  template = avail_topic[CONF_AVAILABILITY_TEMPLATE]
538  payload = template(msg.payload) if template else msg.payload
539 
540  if payload == avail_topic[CONF_PAYLOAD_AVAILABLE]:
541  self._available_available[topic] = True
542  self._available_latest_available_latest = True
543  elif payload == avail_topic[CONF_PAYLOAD_NOT_AVAILABLE]:
544  self._available_available[topic] = False
545  self._available_latest_available_latest = False
546 
547  @callback
549  """(Re)Subscribe to topics."""
550  async_subscribe_topics_internal(self.hasshass, self._availability_sub_state_availability_sub_state)
551 
552  @callback
553  def async_mqtt_connection_state_changed(self, state: bool) -> None:
554  """Update state on connection/disconnection to MQTT broker."""
555  if not self.hasshass.is_stopping:
556  self.async_write_ha_stateasync_write_ha_state()
557 
558  async def async_will_remove_from_hass(self) -> None:
559  """Unsubscribe when removed."""
560  self._availability_sub_state_availability_sub_state = async_unsubscribe_topics(
561  self.hasshass, self._availability_sub_state_availability_sub_state
562  )
563 
564  @property
565  def available(self) -> bool:
566  """Return if the device is available."""
567  mqtt_data = self.hasshass.data[DATA_MQTT]
568  client = mqtt_data.client
569  if not client.connected and not self.hasshass.is_stopping:
570  return False
571  if not self._avail_topics:
572  return True
573  if self._avail_config_avail_config[CONF_AVAILABILITY_MODE] == AVAILABILITY_ALL:
574  return all(self._available_available.values())
575  if self._avail_config_avail_config[CONF_AVAILABILITY_MODE] == AVAILABILITY_ANY:
576  return any(self._available_available.values())
577  return self._available_latest_available_latest
578 
579 
581  hass: HomeAssistant, device_id: str | None, config_entry_id: str | None
582 ) -> None:
583  """Clean up the device registry after MQTT removal.
584 
585  Remove MQTT from the device registry entry if there are no remaining
586  entities, triggers or tags.
587  """
588  # Local import to avoid circular dependencies
589  # pylint: disable-next=import-outside-toplevel
590  from . import device_trigger, tag
591 
592  device_registry = dr.async_get(hass)
593  entity_registry = er.async_get(hass)
594  if (
595  device_id
596  and device_id not in device_registry.deleted_devices
597  and config_entry_id
598  and not er.async_entries_for_device(
599  entity_registry, device_id, include_disabled_entities=False
600  )
601  and not await device_trigger.async_get_triggers(hass, device_id)
602  and not tag.async_has_tags(hass, device_id)
603  ):
604  device_registry.async_update_device(
605  device_id, remove_config_entry_id=config_entry_id
606  )
607 
608 
609 def get_discovery_hash(discovery_data: DiscoveryInfoType) -> tuple[str, str]:
610  """Get the discovery hash from the discovery data."""
611  discovery_hash: tuple[str, str] = discovery_data[ATTR_DISCOVERY_HASH]
612  return discovery_hash
613 
614 
615 def send_discovery_done(hass: HomeAssistant, discovery_data: DiscoveryInfoType) -> None:
616  """Acknowledge a discovery message has been handled."""
617  discovery_hash = get_discovery_hash(discovery_data)
618  async_dispatcher_send(hass, MQTT_DISCOVERY_DONE.format(*discovery_hash), None)
619 
620 
622  hass: HomeAssistant,
623  discovery_data: DiscoveryInfoType,
624  remove_discovery_updated: Callable[[], None] | None = None,
625 ) -> None:
626  """Stop discovery updates of being sent."""
627  if remove_discovery_updated:
628  remove_discovery_updated()
629  remove_discovery_updated = None
630  discovery_hash = get_discovery_hash(discovery_data)
631  clear_discovery_hash(hass, discovery_hash)
632 
633 
635  hass: HomeAssistant, discovery_data: DiscoveryInfoType
636 ) -> None:
637  """Clear retained discovery payload.
638 
639  Remove discovery topic in broker to avoid rediscovery
640  after a restart of Home Assistant.
641  """
642  discovery_topic = discovery_data[ATTR_DISCOVERY_TOPIC]
643  await async_publish(hass, discovery_topic, None, retain=True)
644 
645 
647  hass: HomeAssistant,
648  discovery_data: DiscoveryInfoType,
649  event: Event[er.EventEntityRegistryUpdatedData],
650 ) -> None:
651  """Clear the discovery topic if the entity is removed."""
652  if event.data["action"] == "remove":
653  # publish empty payload to config topic to avoid re-adding
654  await async_remove_discovery_payload(hass, discovery_data)
655 
656 
658  """Add support for auto discovery for platforms without an entity."""
659 
660  def __init__(
661  self,
662  hass: HomeAssistant,
663  discovery_data: DiscoveryInfoType,
664  device_id: str | None,
665  config_entry: ConfigEntry,
666  log_name: str,
667  ) -> None:
668  """Initialize the update service."""
669 
670  self.hasshass = hass
671  self.log_namelog_name = log_name
672 
673  self._discovery_data_discovery_data = discovery_data
674  self._device_id_device_id = device_id
675  self._config_entry_config_entry = config_entry
676  self._config_entry_id_config_entry_id = config_entry.entry_id
677  self._skip_device_removal_skip_device_removal: bool = False
678  self._migrate_discovery_migrate_discovery: str | None = None
679 
680  discovery_hash = get_discovery_hash(discovery_data)
681  self._remove_discovery_updated_remove_discovery_updated = async_dispatcher_connect(
682  hass,
683  MQTT_DISCOVERY_UPDATED.format(*discovery_hash),
684  self.async_discovery_updateasync_discovery_update,
685  )
686  config_entry.async_on_unload(self._entry_unload_entry_unload)
687  if device_id is not None:
689  hass, device_id, self._async_device_removed_async_device_removed
690  )
691  _LOGGER.debug(
692  "%s %s has been initialized",
693  self.log_namelog_name,
694  discovery_hash,
695  )
696 
697  @callback
698  def _entry_unload(self, *_: Any) -> None:
699  """Handle cleanup when the config entry is unloaded."""
701  self.hasshass, self._discovery_data_discovery_data, self._remove_discovery_updated_remove_discovery_updated
702  )
703  self._config_entry_config_entry.async_create_task(self.hasshass, self.async_tear_downasync_tear_down())
704 
706  self,
707  discovery_payload: MQTTDiscoveryPayload,
708  ) -> None:
709  """Handle discovery update."""
710  discovery_hash = get_discovery_hash(self._discovery_data_discovery_data)
711  # Start discovery migration or rollback if migrate_discovery flag is set
712  # and the discovery topic is valid and not yet migrating
713  if (
714  discovery_payload.migrate_discovery
715  and self._migrate_discovery_migrate_discovery is None
716  and self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC]
717  == discovery_payload.discovery_data[ATTR_DISCOVERY_TOPIC]
718  ):
719  self._migrate_discovery_migrate_discovery = self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC]
720  discovery_hash = self._discovery_data_discovery_data[ATTR_DISCOVERY_HASH]
721  origin_info = get_origin_log_string(
722  self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD], include_url=False
723  )
724  action = "Rollback" if discovery_payload.device_discovery else "Migration"
725  schema_type = "platform" if discovery_payload.device_discovery else "device"
726  _LOGGER.info(
727  "%s to MQTT %s discovery schema started for %s '%s'"
728  "%s on topic %s. To complete %s, publish a %s discovery "
729  "message with %s '%s'. After completed %s, "
730  "publish an empty (retained) payload to %s",
731  action,
732  schema_type,
733  discovery_hash[0],
734  discovery_hash[1],
735  origin_info,
736  self._migrate_discovery_migrate_discovery,
737  action.lower(),
738  schema_type,
739  discovery_hash[0],
740  discovery_hash[1],
741  action.lower(),
742  self._migrate_discovery_migrate_discovery,
743  )
744 
745  # Cleanup platform resources
746  await self.async_tear_downasync_tear_down()
747  # Unregister and clean discovery
749  self.hasshass, self._discovery_data_discovery_data, self._remove_discovery_updated_remove_discovery_updated
750  )
751  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
752  return
753 
754  _LOGGER.debug(
755  "Got update for %s with hash: %s '%s'",
756  self.log_namelog_name,
757  discovery_hash,
758  discovery_payload,
759  )
760  new_discovery_topic = discovery_payload.discovery_data[ATTR_DISCOVERY_TOPIC]
761 
762  # Abort early if an update is not received via the registered discovery topic.
763  # This can happen if a device and single component discovery payload
764  # share the same discovery ID.
765  if self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC] != new_discovery_topic:
766  # Prevent illegal updates
767  old_origin_info = get_origin_log_string(
768  self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD], include_url=False
769  )
770  new_origin_info = get_origin_log_string(
771  discovery_payload.discovery_data[ATTR_DISCOVERY_PAYLOAD],
772  include_url=False,
773  )
774  new_origin_support_url = get_origin_support_url(
775  discovery_payload.discovery_data[ATTR_DISCOVERY_PAYLOAD]
776  )
777  if new_origin_support_url:
778  get_support = f"for support visit {new_origin_support_url}"
779  else:
780  get_support = (
781  "for documentation on migration to device schema or rollback to "
782  "discovery schema, visit https://www.home-assistant.io/integrations/"
783  "mqtt/#migration-from-single-component-to-device-based-discovery"
784  )
785  _LOGGER.warning(
786  "Received a conflicting MQTT discovery message for %s '%s' which was "
787  "previously discovered on topic %s%s; the conflicting discovery "
788  "message was received on topic %s%s; %s",
789  discovery_hash[0],
790  discovery_hash[1],
791  self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC],
792  old_origin_info,
793  new_discovery_topic,
794  new_origin_info,
795  get_support,
796  )
797  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
798  return
799 
800  if (
801  discovery_payload
802  and discovery_payload != self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD]
803  ):
804  _LOGGER.debug(
805  "Updating %s with hash %s",
806  self.log_namelog_name,
807  discovery_hash,
808  )
809  try:
810  await self.async_updateasync_update(discovery_payload)
811  finally:
812  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
813  self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD] = discovery_payload
814  elif not discovery_payload:
815  # Unregister and clean up the current discovery instance
817  self.hasshass, self._discovery_data_discovery_data, self._remove_discovery_updated_remove_discovery_updated
818  )
819  await self._async_tear_down_async_tear_down()
820  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
821  _LOGGER.debug(
822  "%s %s has been removed",
823  self.log_namelog_name,
824  discovery_hash,
825  )
826  else:
827  # Normal update without change
828  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
829  _LOGGER.debug(
830  "%s %s no changes",
831  self.log_namelog_name,
832  discovery_hash,
833  )
834  return
835 
837  self, event: Event[EventDeviceRegistryUpdatedData]
838  ) -> None:
839  """Handle the manual removal of a device."""
840  if self._skip_device_removal_skip_device_removal or not async_removed_from_device(
841  self.hasshass, event, cast(str, self._device_id_device_id), self._config_entry_id_config_entry_id
842  ):
843  return
844  # Prevent a second cleanup round after the device is removed
845  self._remove_device_updated_remove_device_updated()
846  self._skip_device_removal_skip_device_removal = True
847  # Unregister and clean up and publish an empty payload
848  # so the service is not rediscovered after a restart
850  self.hasshass, self._discovery_data_discovery_data, self._remove_discovery_updated_remove_discovery_updated
851  )
852  await self._async_tear_down_async_tear_down()
853  await async_remove_discovery_payload(self.hasshass, self._discovery_data_discovery_data)
854 
855  async def _async_tear_down(self) -> None:
856  """Handle the cleanup of the discovery service."""
857  # Cleanup platform resources
858  await self.async_tear_downasync_tear_down()
859  # remove the service for auto discovery updates and clean up the device registry
860  if not self._skip_device_removal_skip_device_removal:
861  # Prevent a second cleanup round after the device is removed
862  self._skip_device_removal_skip_device_removal = True
864  self.hasshass, self._device_id_device_id, self._config_entry_id_config_entry_id
865  )
866 
867  @abstractmethod
868  async def async_update(self, discovery_data: MQTTDiscoveryPayload) -> None:
869  """Handle the update of platform specific parts, extend to the platform."""
870 
871  @abstractmethod
872  async def async_tear_down(self) -> None:
873  """Handle the cleanup of platform specific parts, extend to the platform."""
874 
875 
876 class MqttDiscoveryUpdateMixin(Entity):
877  """Mixin used to handle updated discovery message for entity based platforms."""
878 
879  def __init__(
880  self,
881  hass: HomeAssistant,
882  discovery_data: DiscoveryInfoType | None,
883  discovery_update: Callable[[MQTTDiscoveryPayload], Coroutine[Any, Any, None]]
884  | None = None,
885  ) -> None:
886  """Initialize the discovery update mixin."""
887  self._discovery_data_discovery_data = discovery_data
888  self._discovery_update_discovery_update = discovery_update
889  self._remove_discovery_updated_remove_discovery_updated: Callable[[], None] | None = None
890  self._removed_from_hass_removed_from_hass = False
891  if discovery_data is None:
892  return
893  mqtt_data = hass.data[DATA_MQTT]
894  self._registry_hooks_registry_hooks = mqtt_data.discovery_registry_hooks
895  discovery_hash: tuple[str, str] = discovery_data[ATTR_DISCOVERY_HASH]
896  self._migrate_discovery_migrate_discovery: str | None = None
897  if discovery_hash in self._registry_hooks_registry_hooks:
898  self._registry_hooks_registry_hooks.pop(discovery_hash)()
899 
900  async def async_added_to_hass(self) -> None:
901  """Subscribe to discovery updates."""
902  await super().async_added_to_hass()
903  self._removed_from_hass_removed_from_hass = False
904  if not self._discovery_data_discovery_data:
905  return
906  discovery_hash: tuple[str, str] = self._discovery_data_discovery_data[ATTR_DISCOVERY_HASH]
907  debug_info.add_entity_discovery_data(
908  self.hasshass, self._discovery_data_discovery_data, self.entity_identity_id
909  )
910  # Set in case the entity has been removed and is re-added,
911  # for example when changing entity_id
912  set_discovery_hash(self.hasshass, discovery_hash)
913  self._remove_discovery_updated_remove_discovery_updated = async_dispatcher_connect(
914  self.hasshass,
915  MQTT_DISCOVERY_UPDATED.format(*discovery_hash),
916  self._async_discovery_callback_async_discovery_callback,
917  )
918 
920  self: MqttDiscoveryUpdateMixin,
921  ) -> None:
922  """Remove entity's state and entity registry entry.
923 
924  Remove entity from entity registry if it is registered,
925  this also removes the state. If the entity is not in the entity
926  registry, just remove the state.
927  """
928  entity_registry = er.async_get(self.hasshass)
929  if entity_entry := entity_registry.async_get(self.entity_identity_id):
930  entity_registry.async_remove(self.entity_identity_id)
932  self.hasshass, entity_entry.device_id, entity_entry.config_entry_id
933  )
934  else:
935  await self.async_removeasync_remove(force_remove=True)
936 
938  self,
939  payload: MQTTDiscoveryPayload,
940  discovery_update: Callable[[MQTTDiscoveryPayload], Coroutine[Any, Any, None]],
941  discovery_data: DiscoveryInfoType,
942  ) -> None:
943  """Process discovery update."""
944  try:
945  await discovery_update(payload)
946  finally:
947  send_discovery_done(self.hasshass, discovery_data)
948 
950  """Process discovery update and remove entity."""
951  if TYPE_CHECKING:
952  assert self._discovery_data_discovery_data
953  self._cleanup_discovery_on_remove_cleanup_discovery_on_remove()
954  if self._migrate_discovery_migrate_discovery is None:
955  # Unload and cleanup registry
956  await self._async_remove_state_and_registry_entry_async_remove_state_and_registry_entry()
957  else:
958  # Only unload the entity
959  await self.async_removeasync_remove(force_remove=True)
960  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
961 
962  @callback
963  def _async_discovery_callback(self, payload: MQTTDiscoveryPayload) -> None:
964  """Handle discovery update.
965 
966  If the payload has changed we will create a task to
967  do the discovery update.
968 
969  As this callback can fire when nothing has changed, this
970  is a normal function to avoid task creation until it is needed.
971  """
972  if TYPE_CHECKING:
973  assert self._discovery_data_discovery_data
974  discovery_hash = get_discovery_hash(self._discovery_data_discovery_data)
975  # Start discovery migration or rollback if migrate_discovery flag is set
976  # and the discovery topic is valid and not yet migrating
977  if (
978  payload.migrate_discovery
979  and self._migrate_discovery_migrate_discovery is None
980  and self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC]
981  == payload.discovery_data[ATTR_DISCOVERY_TOPIC]
982  ):
983  if self.unique_idunique_id is None or self.device_infodevice_info is None:
984  _LOGGER.error(
985  "Discovery migration is not possible for "
986  "for entity %s on topic %s. A unique_id "
987  "and device context is required, got unique_id: %s, device: %s",
988  self.entity_identity_id,
989  self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC],
990  self.unique_idunique_id,
991  self.device_infodevice_info,
992  )
993  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
994  return
995 
996  self._migrate_discovery_migrate_discovery = self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC]
997  discovery_hash = self._discovery_data_discovery_data[ATTR_DISCOVERY_HASH]
998  origin_info = get_origin_log_string(
999  self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD], include_url=False
1000  )
1001  action = "Rollback" if payload.device_discovery else "Migration"
1002  schema_type = "platform" if payload.device_discovery else "device"
1003  _LOGGER.info(
1004  "%s to MQTT %s discovery schema started for entity %s"
1005  "%s on topic %s. To complete %s, publish a %s discovery "
1006  "message with %s entity '%s'. After completed %s, "
1007  "publish an empty (retained) payload to %s",
1008  action,
1009  schema_type,
1010  self.entity_identity_id,
1011  origin_info,
1012  self._migrate_discovery_migrate_discovery,
1013  action.lower(),
1014  schema_type,
1015  discovery_hash[0],
1016  discovery_hash[1],
1017  action.lower(),
1018  self._migrate_discovery_migrate_discovery,
1019  )
1020  old_payload = self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD]
1021  _LOGGER.debug(
1022  "Got update for entity with hash: %s '%s'",
1023  discovery_hash,
1024  payload,
1025  )
1026  new_discovery_topic = payload.discovery_data[ATTR_DISCOVERY_TOPIC]
1027  # Abort early if an update is not received via the registered discovery topic.
1028  # This can happen if a device and single component discovery payload
1029  # share the same discovery ID.
1030  if self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC] != new_discovery_topic:
1031  # Prevent illegal updates
1032  old_origin_info = get_origin_log_string(
1033  self._discovery_data_discovery_data[ATTR_DISCOVERY_PAYLOAD], include_url=False
1034  )
1035  new_origin_info = get_origin_log_string(
1036  payload.discovery_data[ATTR_DISCOVERY_PAYLOAD], include_url=False
1037  )
1038  new_origin_support_url = get_origin_support_url(
1039  payload.discovery_data[ATTR_DISCOVERY_PAYLOAD]
1040  )
1041  if new_origin_support_url:
1042  get_support = f"for support visit {new_origin_support_url}"
1043  else:
1044  get_support = (
1045  "for documentation on migration to device schema or rollback to "
1046  "discovery schema, visit https://www.home-assistant.io/integrations/"
1047  "mqtt/#migration-from-single-component-to-device-based-discovery"
1048  )
1049  _LOGGER.warning(
1050  "Received a conflicting MQTT discovery message for entity %s; the "
1051  "entity was previously discovered on topic %s%s; the conflicting "
1052  "discovery message was received on topic %s%s; %s",
1053  self.entity_identity_id,
1054  self._discovery_data_discovery_data[ATTR_DISCOVERY_TOPIC],
1055  old_origin_info,
1056  new_discovery_topic,
1057  new_origin_info,
1058  get_support,
1059  )
1060  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
1061  return
1062 
1063  debug_info.update_entity_discovery_data(self.hasshass, payload, self.entity_identity_id)
1064  if not payload:
1065  # Empty payload: Remove component
1066  if self._migrate_discovery_migrate_discovery is None:
1067  _LOGGER.info("Removing component: %s", self.entity_identity_id)
1068  else:
1069  _LOGGER.info("Unloading component: %s", self.entity_identity_id)
1070  self.hasshass.async_create_task(
1071  self._async_process_discovery_update_and_remove_async_process_discovery_update_and_remove()
1072  )
1073  elif self._discovery_update_discovery_update:
1074  if old_payload != payload:
1075  # Non-empty, changed payload: Notify component
1076  _LOGGER.info("Updating component: %s", self.entity_identity_id)
1077  self.hasshass.async_create_task(
1078  self._async_process_discovery_update_async_process_discovery_update(
1079  payload, self._discovery_update_discovery_update, self._discovery_data_discovery_data
1080  )
1081  )
1082  else:
1083  # Non-empty, unchanged payload: Ignore to avoid changing states
1084  _LOGGER.debug("Ignoring unchanged update for: %s", self.entity_identity_id)
1085  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
1086 
1087  async def async_removed_from_registry(self) -> None:
1088  """Clear retained discovery topic in broker."""
1089  if not self._removed_from_hass_removed_from_hass and self._discovery_data_discovery_data is not None:
1090  # Stop subscribing to discovery updates to not trigger when we
1091  # clear the discovery topic
1092  self._cleanup_discovery_on_remove_cleanup_discovery_on_remove()
1093 
1094  # Clear the discovery topic so the entity is not
1095  # rediscovered after a restart
1096  await async_remove_discovery_payload(self.hasshass, self._discovery_data_discovery_data)
1097 
1098  @final
1099  async def add_to_platform_finish(self) -> None:
1100  """Finish adding entity to platform."""
1101  await super().add_to_platform_finish()
1102  # Only send the discovery done after the entity is fully added
1103  # and the state is written to the state machine.
1104  if self._discovery_data_discovery_data is not None:
1105  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
1106 
1107  @callback
1108  def add_to_platform_abort(self) -> None:
1109  """Abort adding an entity to a platform."""
1110  if self._discovery_data_discovery_data is not None:
1111  discovery_hash: tuple[str, str] = self._discovery_data_discovery_data[ATTR_DISCOVERY_HASH]
1112  if self.registry_entryregistry_entry is not None:
1113  self._registry_hooks_registry_hooks[discovery_hash] = (
1115  self.hasshass,
1116  self.entity_identity_id,
1117  partial(
1118  async_clear_discovery_topic_if_entity_removed,
1119  self.hasshass,
1120  self._discovery_data_discovery_data,
1121  ),
1122  )
1123  )
1124  stop_discovery_updates(self.hasshass, self._discovery_data_discovery_data)
1125  send_discovery_done(self.hasshass, self._discovery_data_discovery_data)
1126  super().add_to_platform_abort()
1127 
1128  async def async_will_remove_from_hass(self) -> None:
1129  """Stop listening to signal and cleanup discovery data.."""
1130  self._cleanup_discovery_on_remove_cleanup_discovery_on_remove()
1131 
1132  def _cleanup_discovery_on_remove(self) -> None:
1133  """Stop listening to signal and cleanup discovery data."""
1134  if self._discovery_data_discovery_data and not self._removed_from_hass_removed_from_hass:
1136  self.hasshass, self._discovery_data_discovery_data, self._remove_discovery_updated_remove_discovery_updated
1137  )
1138  self._removed_from_hass_removed_from_hass = True
1139 
1140 
1142  specifications: dict[str, Any] | None,
1143 ) -> DeviceInfo | None:
1144  """Return a device description for device registry."""
1145  if not specifications:
1146  return None
1147 
1148  info = DeviceInfo(
1149  identifiers={(DOMAIN, id_) for id_ in specifications[CONF_IDENTIFIERS]},
1150  connections={
1151  (conn_[0], conn_[1]) for conn_ in specifications[CONF_CONNECTIONS]
1152  },
1153  )
1154 
1155  if CONF_MANUFACTURER in specifications:
1156  info[ATTR_MANUFACTURER] = specifications[CONF_MANUFACTURER]
1157 
1158  if CONF_MODEL in specifications:
1159  info[ATTR_MODEL] = specifications[CONF_MODEL]
1160 
1161  if CONF_MODEL_ID in specifications:
1162  info[ATTR_MODEL_ID] = specifications[CONF_MODEL_ID]
1163 
1164  if CONF_NAME in specifications:
1165  info[ATTR_NAME] = specifications[CONF_NAME]
1166 
1167  if CONF_HW_VERSION in specifications:
1168  info[ATTR_HW_VERSION] = specifications[CONF_HW_VERSION]
1169 
1170  if CONF_SERIAL_NUMBER in specifications:
1171  info[ATTR_SERIAL_NUMBER] = specifications[CONF_SERIAL_NUMBER]
1172 
1173  if CONF_SW_VERSION in specifications:
1174  info[ATTR_SW_VERSION] = specifications[CONF_SW_VERSION]
1175 
1176  if CONF_VIA_DEVICE in specifications:
1177  info[ATTR_VIA_DEVICE] = (DOMAIN, specifications[CONF_VIA_DEVICE])
1178 
1179  if CONF_SUGGESTED_AREA in specifications:
1180  info[ATTR_SUGGESTED_AREA] = specifications[CONF_SUGGESTED_AREA]
1181 
1182  if CONF_CONFIGURATION_URL in specifications:
1183  info[ATTR_CONFIGURATION_URL] = specifications[CONF_CONFIGURATION_URL]
1184 
1185  return info
1186 
1187 
1188 @callback
1190  hass: HomeAssistant, device_info: DeviceInfo | None, config_entry: ConfigEntry
1191 ) -> None:
1192  """Ensure the via device is in the device registry."""
1193  if (
1194  device_info is None
1195  or CONF_VIA_DEVICE not in device_info
1196  or (device_registry := dr.async_get(hass)).async_get_device(
1197  identifiers={device_info["via_device"]}
1198  )
1199  ):
1200  return
1201 
1202  # Ensure the via device exists in the device registry
1203  _LOGGER.debug(
1204  "Device identifier %s via_device reference from device_info %s "
1205  "not found in the Device Registry, creating new entry",
1206  device_info["via_device"],
1207  device_info,
1208  )
1209  device_registry.async_get_or_create(
1210  config_entry_id=config_entry.entry_id,
1211  identifiers={device_info["via_device"]},
1212  )
1213 
1214 
1216  """Mixin used for mqtt platforms that support the device registry."""
1217 
1219  self, specifications: dict[str, Any] | None, config_entry: ConfigEntry
1220  ) -> None:
1221  """Initialize the device mixin."""
1222  self._device_specifications_device_specifications = specifications
1223  self._config_entry_config_entry = config_entry
1224 
1225  def device_info_discovery_update(self, config: DiscoveryInfoType) -> None:
1226  """Handle updated discovery message."""
1227  self._device_specifications_device_specifications = config.get(CONF_DEVICE)
1228  device_registry = dr.async_get(self.hasshass)
1229  config_entry_id = self._config_entry_config_entry.entry_id
1230  device_info = self.device_infodevice_infodevice_info
1231 
1232  if device_info is not None:
1233  ensure_via_device_exists(self.hasshass, device_info, self._config_entry_config_entry)
1234  device_registry.async_get_or_create(
1235  config_entry_id=config_entry_id, **device_info
1236  )
1237 
1238  @property
1239  def device_info(self) -> DeviceInfo | None:
1240  """Return a device description for device registry."""
1241  return device_info_from_specifications(self._device_specifications_device_specifications)
1242 
1243 
1245  MqttAttributesMixin,
1246  MqttAvailabilityMixin,
1247  MqttDiscoveryUpdateMixin,
1248  MqttEntityDeviceInfo,
1249 ):
1250  """Representation of an MQTT entity."""
1251 
1252  _attr_force_update = False
1253  _attr_has_entity_name = True
1254  _attr_should_poll = False
1255  _default_name: str | None
1256  _entity_id_format: str
1257 
1259  self,
1260  hass: HomeAssistant,
1261  config: ConfigType,
1262  config_entry: ConfigEntry,
1263  discovery_data: DiscoveryInfoType | None,
1264  ) -> None:
1265  """Init the MQTT Entity."""
1266  self.hasshasshass = hass
1267  self._config_config: ConfigType = config
1268  self._attr_unique_id_attr_unique_id = config.get(CONF_UNIQUE_ID)
1269  self._sub_state_sub_state: dict[str, EntitySubscription] = {}
1270  self._discovery_discovery = discovery_data is not None
1271  self._subscriptions_subscriptions: dict[str, dict[str, Any]]
1272 
1273  # Load config
1274  self._setup_from_config_setup_from_config(self._config_config)
1275  self._setup_common_attributes_from_config_setup_common_attributes_from_config(self._config_config)
1276 
1277  # Initialize entity_id from config
1278  self._init_entity_id_init_entity_id()
1279 
1280  # Initialize mixin classes
1281  MqttAttributesMixin.__init__(self, config)
1282  MqttAvailabilityMixin.__init__(self, config)
1283  MqttDiscoveryUpdateMixin.__init__(
1284  self, hass, discovery_data, self.discovery_updatediscovery_update
1285  )
1286  MqttEntityDeviceInfo.__init__(self, config.get(CONF_DEVICE), config_entry)
1287  ensure_via_device_exists(self.hasshasshass, self.device_infodevice_infodevice_info, self._config_entry_config_entry)
1288 
1289  def _init_entity_id(self) -> None:
1290  """Set entity_id from object_id if defined in config."""
1292  self.hasshasshass, self, self._config_config, self._entity_id_format
1293  )
1294 
1295  @final
1296  async def async_added_to_hass(self) -> None:
1297  """Subscribe to MQTT events."""
1298  await super().async_added_to_hass()
1299  self._subscriptions_subscriptions = {}
1300  self._prepare_subscribe_topics_prepare_subscribe_topics()
1301  if self._subscriptions_subscriptions:
1302  self._sub_state_sub_state = subscription.async_prepare_subscribe_topics(
1303  self.hasshasshass,
1304  self._sub_state_sub_state,
1305  self._subscriptions_subscriptions,
1306  )
1307  await self._subscribe_topics_subscribe_topics()
1308  await self.mqtt_async_added_to_hassmqtt_async_added_to_hass()
1309 
1310  async def mqtt_async_added_to_hass(self) -> None:
1311  """Call before the discovery message is acknowledged.
1312 
1313  To be extended by subclasses.
1314  """
1315 
1316  async def discovery_update(self, discovery_payload: MQTTDiscoveryPayload) -> None:
1317  """Handle updated discovery message."""
1318  try:
1319  config: DiscoveryInfoType = self.config_schema()(discovery_payload)
1320  except vol.Invalid as err:
1321  async_handle_schema_error(discovery_payload, err)
1322  return
1323  self._config_config = config
1324  self._setup_from_config_setup_from_config(self._config_config)
1325  self._setup_common_attributes_from_config_setup_common_attributes_from_config(self._config_config)
1326 
1327  # Prepare MQTT subscriptions
1328  self.attributes_prepare_discovery_updateattributes_prepare_discovery_update(config)
1329  self.availability_prepare_discovery_updateavailability_prepare_discovery_update(config)
1330  self.device_info_discovery_updatedevice_info_discovery_update(config)
1331  self._subscriptions_subscriptions = {}
1332  self._prepare_subscribe_topics_prepare_subscribe_topics()
1333  if self._subscriptions_subscriptions:
1334  self._sub_state_sub_state = subscription.async_prepare_subscribe_topics(
1335  self.hasshasshass,
1336  self._sub_state_sub_state,
1337  self._subscriptions_subscriptions,
1338  )
1339 
1340  # Finalize MQTT subscriptions
1341  await self.attributes_discovery_updateattributes_discovery_update(config)
1342  await self.availability_discovery_updateavailability_discovery_update(config)
1343  await self._subscribe_topics_subscribe_topics()
1344  self.async_write_ha_stateasync_write_ha_state()
1345 
1346  async def async_will_remove_from_hass(self) -> None:
1347  """Unsubscribe when removed."""
1348  self._sub_state_sub_state = subscription.async_unsubscribe_topics(
1349  self.hasshasshass, self._sub_state_sub_state
1350  )
1351  await MqttAttributesMixin.async_will_remove_from_hass(self)
1352  await MqttAvailabilityMixin.async_will_remove_from_hass(self)
1353  await MqttDiscoveryUpdateMixin.async_will_remove_from_hass(self)
1354  debug_info.remove_entity_data(self.hasshasshass, self.entity_identity_id)
1355 
1356  async def async_publish(
1357  self,
1358  topic: str,
1359  payload: PublishPayloadType,
1360  qos: int = 0,
1361  retain: bool = False,
1362  encoding: str | None = DEFAULT_ENCODING,
1363  ) -> None:
1364  """Publish message to an MQTT topic."""
1365  log_message(self.hasshasshass, self.entity_identity_id, topic, payload, qos, retain)
1366  await async_publish(
1367  self.hasshasshass,
1368  topic,
1369  payload,
1370  qos,
1371  retain,
1372  encoding,
1373  )
1374 
1376  self, topic: str, payload: PublishPayloadType
1377  ) -> None:
1378  """Publish payload to a topic using config."""
1379  await self.async_publishasync_publish(
1380  topic,
1381  payload,
1382  self._config_config[CONF_QOS],
1383  self._config_config[CONF_RETAIN],
1384  self._config_config[CONF_ENCODING],
1385  )
1386 
1387  @staticmethod
1388  @abstractmethod
1389  def config_schema() -> VolSchemaType:
1390  """Return the config schema."""
1391 
1392  def _set_entity_name(self, config: ConfigType) -> None:
1393  """Help setting the entity name if needed."""
1394  entity_name: str | None | UndefinedType = config.get(CONF_NAME, UNDEFINED)
1395  # Only set _attr_name if it is needed
1396  if entity_name is not UNDEFINED:
1397  self._attr_name_attr_name = entity_name
1398  elif not self._default_to_device_class_name_default_to_device_class_name():
1399  # Assign the default name
1400  self._attr_name_attr_name = self._default_name
1401  elif hasattr(self, "_attr_name"):
1402  # An entity name was not set in the config
1403  # don't set the name attribute and derive
1404  # the name from the device_class
1405  delattr(self, "_attr_name")
1406  if CONF_DEVICE in config and CONF_NAME not in config[CONF_DEVICE]:
1407  _LOGGER.info(
1408  "MQTT device information always needs to include a name, got %s, "
1409  "if device information is shared between multiple entities, the device "
1410  "name must be included in each entity's device configuration",
1411  config,
1412  )
1413 
1414  def _setup_common_attributes_from_config(self, config: ConfigType) -> None:
1415  """(Re)Setup the common attributes for the entity."""
1416  self._attr_entity_category_attr_entity_category = config.get(CONF_ENTITY_CATEGORY)
1417  self._attr_entity_registry_enabled_default_attr_entity_registry_enabled_default = bool(
1418  config.get(CONF_ENABLED_BY_DEFAULT)
1419  )
1420  self._attr_icon_attr_icon = config.get(CONF_ICON)
1421  self._attr_entity_picture_attr_entity_picture = config.get(CONF_ENTITY_PICTURE)
1422  # Set the entity name if needed
1423  self._set_entity_name_set_entity_name(config)
1424 
1425  def _setup_from_config(self, config: ConfigType) -> None:
1426  """(Re)Setup the entity."""
1427 
1428  @abstractmethod
1429  @callback
1430  def _prepare_subscribe_topics(self) -> None:
1431  """(Re)Subscribe to topics."""
1432 
1433  @abstractmethod
1434  async def _subscribe_topics(self) -> None:
1435  """(Re)Subscribe to topics."""
1436 
1437  @callback
1439  self, attrs_snapshot: tuple[tuple[str, Any | UndefinedType], ...]
1440  ) -> bool:
1441  """Return True if attributes on entity changed or if update is forced."""
1442  if self._attr_force_update_attr_force_update:
1443  return True
1444  for attribute, last_value in attrs_snapshot:
1445  if getattr(self, attribute, UNDEFINED) != last_value:
1446  return True
1447  return False
1448 
1449  @callback
1451  self,
1452  msg_callback: MessageCallbackType,
1453  attributes: set[str] | None,
1454  msg: ReceiveMessage,
1455  ) -> None:
1456  """Process the message callback."""
1457  if attributes is not None:
1458  attrs_snapshot: tuple[tuple[str, Any | UndefinedType], ...] = tuple(
1459  (attribute, getattr(self, attribute, UNDEFINED))
1460  for attribute in attributes
1461  )
1462  mqtt_data = self.hasshasshass.data[DATA_MQTT]
1463  messages = mqtt_data.debug_info_entities[self.entity_identity_id]["subscriptions"][
1464  msg.subscribed_topic
1465  ]["messages"]
1466  if msg not in messages:
1467  messages.append(msg)
1468 
1469  try:
1470  msg_callback(msg)
1471  except MqttValueTemplateException as exc:
1472  _LOGGER.warning(exc)
1473  return
1474 
1475  if attributes is not None and self._attrs_have_changed_attrs_have_changed(attrs_snapshot):
1476  mqtt_data.state_write_requests.write_state_request(self)
1477 
1479  self,
1480  state_topic_config_key: str,
1481  msg_callback: Callable[[ReceiveMessage], None],
1482  tracked_attributes: set[str] | None,
1483  disable_encoding: bool = False,
1484  ) -> bool:
1485  """Add a subscription."""
1486  qos: int = self._config_config[CONF_QOS]
1487  encoding: str | None = None
1488  if not disable_encoding:
1489  encoding = self._config_config[CONF_ENCODING] or None
1490  if (
1491  state_topic_config_key in self._config_config
1492  and self._config_config[state_topic_config_key] is not None
1493  ):
1494  self._subscriptions_subscriptions[state_topic_config_key] = {
1495  "topic": self._config_config[state_topic_config_key],
1496  "msg_callback": partial(
1497  self._message_callback_message_callback, msg_callback, tracked_attributes
1498  ),
1499  "entity_id": self.entity_identity_id,
1500  "qos": qos,
1501  "encoding": encoding,
1502  "job_type": HassJobType.Callback,
1503  }
1504  return True
1505  return False
1506 
1507 
1509  hass: HomeAssistant,
1510  config_entry: ConfigEntry,
1511  config: ConfigType,
1512 ) -> str | None:
1513  """Update device registry."""
1514  if CONF_DEVICE not in config:
1515  return None
1516 
1517  device: DeviceEntry | None = None
1518  device_registry = dr.async_get(hass)
1519  config_entry_id = config_entry.entry_id
1520  device_info = device_info_from_specifications(config[CONF_DEVICE])
1521 
1522  ensure_via_device_exists(hass, device_info, config_entry)
1523 
1524  if config_entry_id is not None and device_info is not None:
1525  update_device_info = cast(dict[str, Any], device_info)
1526  update_device_info["config_entry_id"] = config_entry_id
1527  device = device_registry.async_get_or_create(**update_device_info)
1528 
1529  return device.id if device else None
1530 
1531 
1532 @callback
1534  hass: HomeAssistant,
1535  event: Event[EventDeviceRegistryUpdatedData],
1536  mqtt_device_id: str,
1537  config_entry_id: str,
1538 ) -> bool:
1539  """Check if the passed event indicates MQTT was removed from a device."""
1540  if event.data["action"] == "update":
1541  if "config_entries" not in event.data["changes"]:
1542  return False
1543  device_registry = dr.async_get(hass)
1544  if (
1545  device_entry := device_registry.async_get(mqtt_device_id)
1546  ) and config_entry_id in device_entry.config_entries:
1547  # Not removed from device
1548  return False
1549 
1550  return True
None _attributes_message_received(self, ReceiveMessage msg)
Definition: entity.py:424
None attributes_prepare_discovery_update(self, DiscoveryInfoType config)
Definition: entity.py:378
None attributes_discovery_update(self, DiscoveryInfoType config)
Definition: entity.py:383
None _availability_message_received(self, ReceiveMessage msg)
Definition: entity.py:533
None availability_prepare_discovery_update(self, DiscoveryInfoType config)
Definition: entity.py:469
None _availability_setup_from_config(self, ConfigType config)
Definition: entity.py:478
None availability_discovery_update(self, DiscoveryInfoType config)
Definition: entity.py:474
None _async_device_removed(self, Event[EventDeviceRegistryUpdatedData] event)
Definition: entity.py:838
None __init__(self, HomeAssistant hass, DiscoveryInfoType discovery_data, str|None device_id, ConfigEntry config_entry, str log_name)
Definition: entity.py:667
None async_update(self, MQTTDiscoveryPayload discovery_data)
Definition: entity.py:868
None async_discovery_update(self, MQTTDiscoveryPayload discovery_payload)
Definition: entity.py:708
None _async_process_discovery_update(self, MQTTDiscoveryPayload payload, Callable[[MQTTDiscoveryPayload], Coroutine[Any, Any, None]] discovery_update, DiscoveryInfoType discovery_data)
Definition: entity.py:942
None _async_discovery_callback(self, MQTTDiscoveryPayload payload)
Definition: entity.py:963
None _async_remove_state_and_registry_entry(MqttDiscoveryUpdateMixin self)
Definition: entity.py:921
None __init__(self, HomeAssistant hass, DiscoveryInfoType|None discovery_data, Callable[[MQTTDiscoveryPayload], Coroutine[Any, Any, None]]|None discovery_update=None)
Definition: entity.py:885
None __init__(self, dict[str, Any]|None specifications, ConfigEntry config_entry)
Definition: entity.py:1220
None device_info_discovery_update(self, DiscoveryInfoType config)
Definition: entity.py:1225
None _setup_from_config(self, ConfigType config)
Definition: entity.py:1425
None __init__(self, HomeAssistant hass, ConfigType config, ConfigEntry config_entry, DiscoveryInfoType|None discovery_data)
Definition: entity.py:1264
bool _attrs_have_changed(self, tuple[tuple[str, Any|UndefinedType],...] attrs_snapshot)
Definition: entity.py:1440
None async_publish_with_config(self, str topic, PublishPayloadType payload)
Definition: entity.py:1377
None discovery_update(self, MQTTDiscoveryPayload discovery_payload)
Definition: entity.py:1316
None _setup_common_attributes_from_config(self, ConfigType config)
Definition: entity.py:1414
bool add_subscription(self, str state_topic_config_key, Callable[[ReceiveMessage], None] msg_callback, set[str]|None tracked_attributes, bool disable_encoding=False)
Definition: entity.py:1484
None _set_entity_name(self, ConfigType config)
Definition: entity.py:1392
None async_publish(self, str topic, PublishPayloadType payload, int qos=0, bool retain=False, str|None encoding=DEFAULT_ENCODING)
Definition: entity.py:1363
None _message_callback(self, MessageCallbackType msg_callback, set[str]|None attributes, ReceiveMessage msg)
Definition: entity.py:1455
None __call__(self, ConfigType config, DiscoveryInfoType discovery_data)
Definition: entity.py:195
bool _default_to_device_class_name(self)
Definition: entity.py:635
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
None async_remove(self, *bool force_remove=False)
Definition: entity.py:1387
DeviceInfo|None device_info(self)
Definition: entity.py:798
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:144
None log_message(HomeAssistant hass, str entity_id, str topic, PublishPayloadType payload, int qos, bool retain)
Definition: debug_info.py:40
None clear_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
Definition: discovery.py:117
None set_discovery_hash(HomeAssistant hass, tuple[str, str] discovery_hash)
Definition: discovery.py:122
str get_origin_log_string(MQTTDiscoveryPayload discovery_payload, *bool include_url)
Definition: discovery.py:130
str|None get_origin_support_url(MQTTDiscoveryPayload discovery_payload)
Definition: discovery.py:145
None cleanup_device_registry(HomeAssistant hass, str|None device_id, str|None config_entry_id)
Definition: entity.py:582
None async_clear_discovery_topic_if_entity_removed(HomeAssistant hass, DiscoveryInfoType discovery_data, Event[er.EventEntityRegistryUpdatedData] event)
Definition: entity.py:650
None stop_discovery_updates(HomeAssistant hass, DiscoveryInfoType discovery_data, Callable[[], None]|None remove_discovery_updated=None)
Definition: entity.py:625
None _handle_discovery_failure(HomeAssistant hass, MQTTDiscoveryPayload discovery_payload)
Definition: entity.py:166
None ensure_via_device_exists(HomeAssistant hass, DeviceInfo|None device_info, ConfigEntry config_entry)
Definition: entity.py:1191
str|None update_device(HomeAssistant hass, ConfigEntry config_entry, ConfigType config)
Definition: entity.py:1512
tuple[str, str] get_discovery_hash(DiscoveryInfoType discovery_data)
Definition: entity.py:609
DeviceInfo|None device_info_from_specifications(dict[str, Any]|None specifications)
Definition: entity.py:1143
None async_setup_entity_entry_helper(HomeAssistant hass, ConfigEntry entry, type[MqttEntity]|None entity_class, str domain, AddEntitiesCallback async_add_entities, VolSchemaType discovery_schema, VolSchemaType platform_schema_modern, dict[str, type[MqttEntity]]|None schema_class_mapping=None)
Definition: entity.py:245
bool async_removed_from_device(HomeAssistant hass, Event[EventDeviceRegistryUpdatedData] event, str mqtt_device_id, str config_entry_id)
Definition: entity.py:1538
bool _verify_mqtt_config_entry_enabled_for_discovery(HomeAssistant hass, str domain, MQTTDiscoveryPayload discovery_payload)
Definition: entity.py:175
None send_discovery_done(HomeAssistant hass, DiscoveryInfoType discovery_data)
Definition: entity.py:615
None async_setup_non_entity_entry_helper(HomeAssistant hass, str domain, _SetupNonEntityHelperCallbackProtocol async_setup, vol.Schema discovery_schema)
Definition: entity.py:204
None async_remove_discovery_payload(HomeAssistant hass, DiscoveryInfoType discovery_data)
Definition: entity.py:636
None init_entity_id_from_config(HomeAssistant hass, Entity entity, ConfigType config, str entity_id_format)
Definition: entity.py:353
None async_handle_schema_error(MQTTDiscoveryPayload discovery_payload, vol.Invalid err)
Definition: entity.py:152
None async_subscribe_topics_internal(HomeAssistant hass, dict[str, EntitySubscription] sub_state)
dict[str, EntitySubscription] async_prepare_subscribe_topics(HomeAssistant hass, dict[str, EntitySubscription]|None sub_state, dict[str, dict[str, Any]] topics)
Definition: subscription.py:91
bool|None mqtt_config_entry_enabled(HomeAssistant hass)
Definition: util.py:192
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:228
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
bool template(HomeAssistant hass, Template value_template, TemplateVarsType variables=None)
Definition: condition.py:759
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
str async_generate_entity_id(str entity_id_format, str|None name, Iterable[str]|None current_ids=None, HomeAssistant|None hass=None)
Definition: entity.py:119
CALLBACK_TYPE async_track_device_registry_updated_event(HomeAssistant hass, str|Iterable[str] device_ids, Callable[[Event[EventDeviceRegistryUpdatedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:599
CALLBACK_TYPE async_track_entity_registry_updated_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventEntityRegistryUpdatedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:543