Home Assistant Unofficial Reference 2024.12.1
client.py
Go to the documentation of this file.
1 """Support for MQTT message handling."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import defaultdict
7 from collections.abc import AsyncGenerator, Callable, Coroutine, Iterable
8 import contextlib
9 from dataclasses import dataclass
10 from functools import lru_cache, partial
11 from itertools import chain, groupby
12 import logging
13 from operator import attrgetter
14 import socket
15 import ssl
16 import time
17 from typing import TYPE_CHECKING, Any
18 import uuid
19 
20 import certifi
21 
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.const import (
24  CONF_CLIENT_ID,
25  CONF_PASSWORD,
26  CONF_PORT,
27  CONF_PROTOCOL,
28  CONF_USERNAME,
29  EVENT_HOMEASSISTANT_STOP,
30 )
31 from homeassistant.core import (
32  CALLBACK_TYPE,
33  Event,
34  HassJob,
35  HassJobType,
36  HomeAssistant,
37  callback,
38  get_hassjob_callable_job_type,
39 )
40 from homeassistant.exceptions import HomeAssistantError
41 from homeassistant.helpers.dispatcher import async_dispatcher_send
42 from homeassistant.helpers.importlib import async_import_module
43 from homeassistant.helpers.start import async_at_started
44 from homeassistant.helpers.typing import ConfigType
45 from homeassistant.loader import bind_hass
46 from homeassistant.setup import SetupPhases, async_pause_setup
47 from homeassistant.util.collection import chunked_or_all
48 from homeassistant.util.logging import catch_log_exception, log_exception
49 
50 from .const import (
51  CONF_BIRTH_MESSAGE,
52  CONF_BROKER,
53  CONF_CERTIFICATE,
54  CONF_CLIENT_CERT,
55  CONF_CLIENT_KEY,
56  CONF_KEEPALIVE,
57  CONF_TLS_INSECURE,
58  CONF_TRANSPORT,
59  CONF_WILL_MESSAGE,
60  CONF_WS_HEADERS,
61  CONF_WS_PATH,
62  DEFAULT_BIRTH,
63  DEFAULT_ENCODING,
64  DEFAULT_KEEPALIVE,
65  DEFAULT_PORT,
66  DEFAULT_PROTOCOL,
67  DEFAULT_QOS,
68  DEFAULT_TRANSPORT,
69  DEFAULT_WILL,
70  DEFAULT_WS_HEADERS,
71  DEFAULT_WS_PATH,
72  DOMAIN,
73  MQTT_CONNECTION_STATE,
74  PROTOCOL_5,
75  PROTOCOL_31,
76  TRANSPORT_WEBSOCKETS,
77 )
78 from .models import (
79  DATA_MQTT,
80  MessageCallbackType,
81  MqttData,
82  PublishMessage,
83  PublishPayloadType,
84  ReceiveMessage,
85 )
86 from .util import EnsureJobAfterCooldown, get_file_path, mqtt_config_entry_enabled
87 
88 if TYPE_CHECKING:
89  # Only import for paho-mqtt type checking here, imports are done locally
90  # because integrations should be able to optionally rely on MQTT.
91  import paho.mqtt.client as mqtt
92 
93  from .async_client import AsyncMQTTClient
94 
95 _LOGGER = logging.getLogger(__name__)
96 
97 MIN_BUFFER_SIZE = 131072 # Minimum buffer size to use if preferred size fails
98 PREFERRED_BUFFER_SIZE = 8 * 1024 * 1024 # Set receive buffer size to 8MiB
99 
100 DISCOVERY_COOLDOWN = 5
101 # The initial subscribe cooldown controls how long to wait to group
102 # subscriptions together. This is to avoid making too many subscribe
103 # requests in a short period of time. If the number is too low, the
104 # system will be flooded with subscribe requests. If the number is too
105 # high, we risk being flooded with responses to the subscribe requests
106 # which can exceed the receive buffer size of the socket. To mitigate
107 # this, we increase the receive buffer size of the socket as well.
108 INITIAL_SUBSCRIBE_COOLDOWN = 0.5
109 SUBSCRIBE_COOLDOWN = 0.1
110 UNSUBSCRIBE_COOLDOWN = 0.1
111 TIMEOUT_ACK = 10
112 RECONNECT_INTERVAL_SECONDS = 10
113 
114 MAX_WILDCARD_SUBSCRIBES_PER_CALL = 1
115 MAX_SUBSCRIBES_PER_CALL = 500
116 MAX_UNSUBSCRIBES_PER_CALL = 500
117 
118 MAX_PACKETS_TO_READ = 500
119 
120 type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any
121 
122 type SubscribePayloadType = str | bytes # Only bytes if encoding is None
123 
124 
126  hass: HomeAssistant,
127  topic: str,
128  payload: PublishPayloadType,
129  qos: int | None = 0,
130  retain: bool | None = False,
131  encoding: str | None = DEFAULT_ENCODING,
132 ) -> None:
133  """Publish message to a MQTT topic."""
134  hass.create_task(async_publish(hass, topic, payload, qos, retain, encoding))
135 
136 
137 async def async_publish(
138  hass: HomeAssistant,
139  topic: str,
140  payload: PublishPayloadType,
141  qos: int | None = 0,
142  retain: bool | None = False,
143  encoding: str | None = DEFAULT_ENCODING,
144 ) -> None:
145  """Publish message to a MQTT topic."""
146  if not mqtt_config_entry_enabled(hass):
147  raise HomeAssistantError(
148  f"Cannot publish to topic '{topic}', MQTT is not enabled",
149  translation_key="mqtt_not_setup_cannot_publish",
150  translation_domain=DOMAIN,
151  translation_placeholders={"topic": topic},
152  )
153  mqtt_data = hass.data[DATA_MQTT]
154  outgoing_payload = payload
155  if not isinstance(payload, bytes) and payload is not None:
156  if not encoding:
157  _LOGGER.error(
158  (
159  "Can't pass-through payload for publishing %s on %s with no"
160  " encoding set, need 'bytes' got %s"
161  ),
162  payload,
163  topic,
164  type(payload),
165  )
166  return
167  outgoing_payload = str(payload)
168  if encoding != DEFAULT_ENCODING:
169  # A string is encoded as utf-8 by default, other encoding
170  # requires bytes as payload
171  try:
172  outgoing_payload = outgoing_payload.encode(encoding)
173  except (AttributeError, LookupError, UnicodeEncodeError):
174  _LOGGER.error(
175  "Can't encode payload for publishing %s on %s with encoding %s",
176  payload,
177  topic,
178  encoding,
179  )
180  return
181 
182  await mqtt_data.client.async_publish(
183  topic, outgoing_payload, qos or 0, retain or False
184  )
185 
186 
187 @bind_hass
188 async def async_subscribe(
189  hass: HomeAssistant,
190  topic: str,
191  msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any, None] | None],
192  qos: int = DEFAULT_QOS,
193  encoding: str | None = DEFAULT_ENCODING,
194 ) -> CALLBACK_TYPE:
195  """Subscribe to an MQTT topic.
196 
197  Call the return value to unsubscribe.
198  """
199  return async_subscribe_internal(hass, topic, msg_callback, qos, encoding)
200 
201 
202 @callback
204  hass: HomeAssistant,
205  topic: str,
206  msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any, None] | None],
207  qos: int = DEFAULT_QOS,
208  encoding: str | None = DEFAULT_ENCODING,
209  job_type: HassJobType | None = None,
210 ) -> CALLBACK_TYPE:
211  """Subscribe to an MQTT topic.
212 
213  This function is internal to the MQTT integration
214  and may change at any time. It should not be considered
215  a stable API.
216 
217  Call the return value to unsubscribe.
218  """
219  try:
220  mqtt_data = hass.data[DATA_MQTT]
221  except KeyError as exc:
222  raise HomeAssistantError(
223  f"Cannot subscribe to topic '{topic}', "
224  "make sure MQTT is set up correctly",
225  translation_key="mqtt_not_setup_cannot_subscribe",
226  translation_domain=DOMAIN,
227  translation_placeholders={"topic": topic},
228  ) from exc
229  client = mqtt_data.client
230  if not mqtt_config_entry_enabled(hass):
231  raise HomeAssistantError(
232  f"Cannot subscribe to topic '{topic}', MQTT is not enabled",
233  translation_key="mqtt_not_setup_cannot_subscribe",
234  translation_domain=DOMAIN,
235  translation_placeholders={"topic": topic},
236  )
237  return client.async_subscribe(topic, msg_callback, qos, encoding, job_type)
238 
239 
240 @bind_hass
242  hass: HomeAssistant,
243  topic: str,
244  msg_callback: MessageCallbackType,
245  qos: int = DEFAULT_QOS,
246  encoding: str = "utf-8",
247 ) -> Callable[[], None]:
248  """Subscribe to an MQTT topic."""
249  async_remove = asyncio.run_coroutine_threadsafe(
250  async_subscribe(hass, topic, msg_callback, qos, encoding), hass.loop
251  ).result()
252 
253  def remove() -> None:
254  """Remove listener convert."""
255  # MQTT messages tend to be high volume,
256  # and since they come in via a thread and need to be processed in the event loop,
257  # we want to avoid hass.add_job since most of the time is spent calling
258  # inspect to figure out how to run the callback.
259  hass.loop.call_soon_threadsafe(async_remove)
260 
261  return remove
262 
263 
264 @dataclass(slots=True, frozen=True)
266  """Class to hold data about an active subscription."""
267 
268  topic: str
269  is_simple_match: bool
270  complex_matcher: Callable[[str], bool] | None
271  job: HassJob[[ReceiveMessage], Coroutine[Any, Any, None] | None]
272  qos: int = 0
273  encoding: str | None = "utf-8"
274 
275 
277  """Helper class to setup the paho mqtt client from config."""
278 
279  _client: AsyncMQTTClient
280 
281  def __init__(self, config: ConfigType) -> None:
282  """Initialize the MQTT client setup helper.
283 
284  self.setup must be run in an executor job.
285  """
286 
287  self._config_config = config
288 
289  def setup(self) -> None:
290  """Set up the MQTT client.
291 
292  The setup of the MQTT client should be run in an executor job,
293  because it accesses files, so it does IO.
294  """
295  # We don't import on the top because some integrations
296  # should be able to optionally rely on MQTT.
297  import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel
298 
299  # pylint: disable-next=import-outside-toplevel
300  from .async_client import AsyncMQTTClient
301 
302  config = self._config_config
303  if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31:
304  proto = mqtt.MQTTv31
305  elif protocol == PROTOCOL_5:
306  proto = mqtt.MQTTv5
307  else:
308  proto = mqtt.MQTTv311
309 
310  if (client_id := config.get(CONF_CLIENT_ID)) is None:
311  # PAHO MQTT relies on the MQTT server to generate random client IDs.
312  # However, that feature is not mandatory so we generate our own.
313  client_id = mqtt.base62(uuid.uuid4().int, padding=22)
314  transport: str = config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
315  self._client_client = AsyncMQTTClient(
316  client_id,
317  protocol=proto,
318  transport=transport,
319  reconnect_on_failure=False,
320  )
321  self._client_client.setup()
322 
323  # Enable logging
324  self._client_client.enable_logger()
325 
326  username: str | None = config.get(CONF_USERNAME)
327  password: str | None = config.get(CONF_PASSWORD)
328  if username is not None:
329  self._client_client.username_pw_set(username, password)
330 
331  if (
332  certificate := get_file_path(CONF_CERTIFICATE, config.get(CONF_CERTIFICATE))
333  ) == "auto":
334  certificate = certifi.where()
335 
336  client_key = get_file_path(CONF_CLIENT_KEY, config.get(CONF_CLIENT_KEY))
337  client_cert = get_file_path(CONF_CLIENT_CERT, config.get(CONF_CLIENT_CERT))
338  tls_insecure = config.get(CONF_TLS_INSECURE)
339  if transport == TRANSPORT_WEBSOCKETS:
340  ws_path: str = config.get(CONF_WS_PATH, DEFAULT_WS_PATH)
341  ws_headers: dict[str, str] = config.get(CONF_WS_HEADERS, DEFAULT_WS_HEADERS)
342  self._client_client.ws_set_options(ws_path, ws_headers)
343  if certificate is not None:
344  self._client_client.tls_set(
345  certificate,
346  certfile=client_cert,
347  keyfile=client_key,
348  tls_version=ssl.PROTOCOL_TLS_CLIENT,
349  )
350 
351  if tls_insecure is not None:
352  self._client_client.tls_insecure_set(tls_insecure)
353 
354  @property
355  def client(self) -> AsyncMQTTClient:
356  """Return the paho MQTT client."""
357  return self._client_client
358 
359 
360 class MQTT:
361  """Home Assistant MQTT client."""
362 
363  _mqttc: AsyncMQTTClient
364  _last_subscribe: float
365  _mqtt_data: MqttData
366 
367  def __init__(
368  self, hass: HomeAssistant, config_entry: ConfigEntry, conf: ConfigType
369  ) -> None:
370  """Initialize Home Assistant MQTT client."""
371  self.hasshass = hass
372  self.looploop = hass.loop
373  self.config_entryconfig_entry = config_entry
374  self.confconf = conf
375 
376  self._simple_subscriptions: defaultdict[str, set[Subscription]] = defaultdict(
377  set
378  )
379  # To ensure the wildcard subscriptions order is preserved, we use a dict
380  # with `None` values instead of a set.
381  self._wildcard_subscriptions: dict[Subscription, None] = {}
382  # _retained_topics prevents a Subscription from receiving a
383  # retained message more than once per topic. This prevents flooding
384  # already active subscribers when new subscribers subscribe to a topic
385  # which has subscribed messages.
386  self._retained_topics: defaultdict[Subscription, set[str]] = defaultdict(set)
387  self.connectedconnected = False
388  self._ha_started_ha_started = asyncio.Event()
389  self._cleanup_on_unload: list[Callable[[], None]] = []
390 
391  self._connection_lock_connection_lock = asyncio.Lock()
392  self._pending_operations: dict[int, asyncio.Future[None]] = {}
393  self._subscribe_debouncer_subscribe_debouncer = EnsureJobAfterCooldown(
394  INITIAL_SUBSCRIBE_COOLDOWN, self._async_perform_subscriptions_async_perform_subscriptions
395  )
396  self._misc_timer_misc_timer: asyncio.TimerHandle | None = None
397  self._reconnect_task_reconnect_task: asyncio.Task | None = None
398  self._should_reconnect_should_reconnect: bool = True
399  self._available_future_available_future: asyncio.Future[bool] | None = None
400 
401  self._max_qos: defaultdict[str, int] = defaultdict(int) # topic, max qos
402  self._pending_subscriptions_pending_subscriptions: dict[str, int] = {} # topic, qos
403  self._unsubscribe_debouncer_unsubscribe_debouncer = EnsureJobAfterCooldown(
404  UNSUBSCRIBE_COOLDOWN, self._async_perform_unsubscribes_async_perform_unsubscribes
405  )
406  self._pending_unsubscribes_pending_unsubscribes: set[str] = set() # topic
407  self._cleanup_on_unload.extend(
408  (
409  async_at_started(hass, self._async_ha_started_async_ha_started),
410  hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self._async_ha_stop_async_ha_stop),
411  )
412  )
413  self._socket_buffersize: int | None = None
414 
415  @callback
416  def _async_ha_started(self, _hass: HomeAssistant) -> None:
417  """Handle HA started."""
418  self._ha_started_ha_started.set()
419 
420  async def _async_ha_stop(self, _event: Event) -> None:
421  """Handle HA stop."""
422  await self.async_disconnectasync_disconnect()
423 
424  async def async_start(
425  self,
426  mqtt_data: MqttData,
427  ) -> None:
428  """Start Home Assistant MQTT client."""
429  self._mqtt_data_mqtt_data = mqtt_data
430  await self.async_init_clientasync_init_client()
431 
432  @property
433  def subscriptions(self) -> set[Subscription]:
434  """Return the tracked subscriptions."""
435  return {
436  *chain.from_iterable(self._simple_subscriptions.values()),
437  *self._wildcard_subscriptions,
438  }
439 
440  def cleanup(self) -> None:
441  """Clean up listeners."""
442  while self._cleanup_on_unload:
443  self._cleanup_on_unload.pop()()
444 
445  @contextlib.asynccontextmanager
446  async def _async_connect_in_executor(self) -> AsyncGenerator[None]:
447  # While we are connecting in the executor we need to
448  # handle on_socket_open and on_socket_register_write
449  # in the executor as well.
450  mqttc = self._mqttc_mqttc
451  try:
452  mqttc.on_socket_open = self._on_socket_open_on_socket_open
453  mqttc.on_socket_register_write = self._on_socket_register_write_on_socket_register_write
454  yield
455  finally:
456  # Once the executor job is done, we can switch back to
457  # handling these in the event loop.
458  mqttc.on_socket_open = self._async_on_socket_open_async_on_socket_open
459  mqttc.on_socket_register_write = self._async_on_socket_register_write_async_on_socket_register_write
460 
461  async def async_init_client(self) -> None:
462  """Initialize paho client."""
463  with async_pause_setup(self.hasshass, SetupPhases.WAIT_IMPORT_PACKAGES):
464  await async_import_module(
465  self.hasshass, "homeassistant.components.mqtt.async_client"
466  )
467 
468  mqttc_setup = MqttClientSetup(self.confconf)
469  await self.hasshass.async_add_executor_job(mqttc_setup.setup)
470  mqttc = mqttc_setup.client
471  # on_socket_unregister_write and _async_on_socket_close
472  # are only ever called in the event loop
473  mqttc.on_socket_close = self._async_on_socket_close_async_on_socket_close
474  mqttc.on_socket_unregister_write = self._async_on_socket_unregister_write_async_on_socket_unregister_write
475 
476  # These will be called in the event loop
477  mqttc.on_connect = self._async_mqtt_on_connect_async_mqtt_on_connect
478  mqttc.on_disconnect = self._async_mqtt_on_disconnect_async_mqtt_on_disconnect
479  mqttc.on_message = self._async_mqtt_on_message_async_mqtt_on_message
480  mqttc.on_publish = self._async_mqtt_on_callback_async_mqtt_on_callback
481  mqttc.on_subscribe = self._async_mqtt_on_callback_async_mqtt_on_callback
482  mqttc.on_unsubscribe = self._async_mqtt_on_callback_async_mqtt_on_callback
483 
484  # suppress exceptions at callback
485  mqttc.suppress_exceptions = True
486 
487  if will := self.confconf.get(CONF_WILL_MESSAGE, DEFAULT_WILL):
488  will_message = PublishMessage(**will)
489  mqttc.will_set(
490  topic=will_message.topic,
491  payload=will_message.payload,
492  qos=will_message.qos,
493  retain=will_message.retain,
494  )
495 
496  self._mqttc_mqttc = mqttc
497 
498  @callback
499  def _async_reader_callback(self, client: mqtt.Client) -> None:
500  """Handle reading data from the socket."""
501  if (status := client.loop_read(MAX_PACKETS_TO_READ)) != 0:
502  self._async_on_disconnect_async_on_disconnect(status)
503 
504  @callback
505  def _async_start_misc_periodic(self) -> None:
506  """Start the misc periodic."""
507  assert self._misc_timer_misc_timer is None, "Misc periodic already started"
508  _LOGGER.debug("%s: Starting client misc loop", self.config_entryconfig_entry.title)
509  # pylint: disable=import-outside-toplevel
510  import paho.mqtt.client as mqtt
511 
512  # Inner function to avoid having to check late import
513  # each time the function is called.
514  @callback
515  def _async_misc() -> None:
516  """Start the MQTT client misc loop."""
517  if self._mqttc_mqttc.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
518  self._misc_timer_misc_timer = self.looploop.call_at(self.looploop.time() + 1, _async_misc)
519 
520  self._misc_timer_misc_timer = self.looploop.call_at(self.looploop.time() + 1, _async_misc)
521 
522  def _increase_socket_buffer_size(self, sock: SocketType) -> None:
523  """Increase the socket buffer size."""
524  if not hasattr(sock, "setsockopt") and hasattr(sock, "_socket"):
525  # The WebsocketWrapper does not wrap setsockopt
526  # so we need to get the underlying socket
527  # Remove this once
528  # https://github.com/eclipse/paho.mqtt.python/pull/843
529  # is available.
530  sock = sock._socket # noqa: SLF001
531 
532  new_buffer_size = PREFERRED_BUFFER_SIZE
533  while True:
534  try:
535  # Some operating systems do not allow us to set the preferred
536  # buffer size. In that case we try some other size options.
537  sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, new_buffer_size)
538  except OSError as err:
539  if new_buffer_size <= MIN_BUFFER_SIZE:
540  _LOGGER.warning(
541  "Unable to increase the socket buffer size to %s; "
542  "The connection may be unstable if the MQTT broker "
543  "sends data at volume or a large amount of subscriptions "
544  "need to be processed: %s",
545  new_buffer_size,
546  err,
547  )
548  return
549  new_buffer_size //= 2
550  else:
551  return
552 
554  self, client: mqtt.Client, userdata: Any, sock: SocketType
555  ) -> None:
556  """Handle socket open."""
557  self.looploop.call_soon_threadsafe(
558  self._async_on_socket_open_async_on_socket_open, client, userdata, sock
559  )
560 
561  @callback
563  self, client: mqtt.Client, userdata: Any, sock: SocketType
564  ) -> None:
565  """Handle socket open."""
566  fileno = sock.fileno()
567  _LOGGER.debug("%s: connection opened %s", self.config_entryconfig_entry.title, fileno)
568  if fileno > -1:
569  self._increase_socket_buffer_size_increase_socket_buffer_size(sock)
570  self.looploop.add_reader(sock, partial(self._async_reader_callback_async_reader_callback, client))
571  if not self._misc_timer_misc_timer:
572  self._async_start_misc_periodic_async_start_misc_periodic()
573  # Try to consume the buffer right away so it doesn't fill up
574  # since add_reader will wait for the next loop iteration
575  self._async_reader_callback_async_reader_callback(client)
576 
577  @callback
579  self, client: mqtt.Client, userdata: Any, sock: SocketType
580  ) -> None:
581  """Handle socket close."""
582  fileno = sock.fileno()
583  _LOGGER.debug("%s: connection closed %s", self.config_entryconfig_entry.title, fileno)
584  # If socket close is called before the connect
585  # result is set make sure the first connection result is set
586  self._async_connection_result_async_connection_result(False)
587  if fileno > -1:
588  self.looploop.remove_reader(sock)
589  if self._misc_timer_misc_timer:
590  self._misc_timer_misc_timer.cancel()
591  self._misc_timer_misc_timer = None
592 
593  @callback
594  def _async_writer_callback(self, client: mqtt.Client) -> None:
595  """Handle writing data to the socket."""
596  if (status := client.loop_write()) != 0:
597  self._async_on_disconnect_async_on_disconnect(status)
598 
600  self, client: mqtt.Client, userdata: Any, sock: SocketType
601  ) -> None:
602  """Register the socket for writing."""
603  self.looploop.call_soon_threadsafe(
604  self._async_on_socket_register_write_async_on_socket_register_write, client, None, sock
605  )
606 
607  @callback
609  self, client: mqtt.Client, userdata: Any, sock: SocketType
610  ) -> None:
611  """Register the socket for writing."""
612  fileno = sock.fileno()
613  _LOGGER.debug("%s: register write %s", self.config_entryconfig_entry.title, fileno)
614  if fileno > -1:
615  self.looploop.add_writer(sock, partial(self._async_writer_callback_async_writer_callback, client))
616 
617  @callback
619  self, client: mqtt.Client, userdata: Any, sock: SocketType
620  ) -> None:
621  """Unregister the socket for writing."""
622  fileno = sock.fileno()
623  _LOGGER.debug("%s: unregister write %s", self.config_entryconfig_entry.title, fileno)
624  if fileno > -1:
625  self.looploop.remove_writer(sock)
626 
627  def _is_active_subscription(self, topic: str) -> bool:
628  """Check if a topic has an active subscription."""
629  return topic in self._simple_subscriptions or any(
630  other.topic == topic for other in self._wildcard_subscriptions
631  )
632 
633  async def async_publish(
634  self, topic: str, payload: PublishPayloadType, qos: int, retain: bool
635  ) -> None:
636  """Publish a MQTT message."""
637  msg_info = self._mqttc_mqttc.publish(topic, payload, qos, retain)
638  _LOGGER.debug(
639  "Transmitting%s message on %s: '%s', mid: %s, qos: %s",
640  " retained" if retain else "",
641  topic,
642  payload,
643  msg_info.mid,
644  qos,
645  )
646  await self._async_wait_for_mid_or_raise_async_wait_for_mid_or_raise(msg_info.mid, msg_info.rc)
647 
648  async def async_connect(self, client_available: asyncio.Future[bool]) -> None:
649  """Connect to the host. Does not process messages yet."""
650  # pylint: disable-next=import-outside-toplevel
651  import paho.mqtt.client as mqtt
652 
653  result: int | None = None
654  self._available_future_available_future = client_available
655  self._should_reconnect_should_reconnect = True
656  try:
657  async with self._connection_lock_connection_lock, self._async_connect_in_executor_async_connect_in_executor():
658  result = await self.hasshass.async_add_executor_job(
659  self._mqttc_mqttc.connect,
660  self.confconf[CONF_BROKER],
661  self.confconf.get(CONF_PORT, DEFAULT_PORT),
662  self.confconf.get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE),
663  )
664  except OSError as err:
665  _LOGGER.error("Failed to connect to MQTT server due to exception: %s", err)
666  self._async_connection_result_async_connection_result(False)
667  finally:
668  if result is not None and result != 0:
669  if result is not None:
670  _LOGGER.error(
671  "Failed to connect to MQTT server: %s",
672  mqtt.error_string(result),
673  )
674  self._async_connection_result_async_connection_result(False)
675 
676  @callback
677  def _async_connection_result(self, connected: bool) -> None:
678  """Handle a connection result."""
679  if self._available_future_available_future and not self._available_future_available_future.done():
680  self._available_future_available_future.set_result(connected)
681 
682  if connected:
683  self._async_cancel_reconnect_async_cancel_reconnect()
684  elif self._should_reconnect_should_reconnect and not self._reconnect_task_reconnect_task:
685  self._reconnect_task_reconnect_task = self.config_entryconfig_entry.async_create_background_task(
686  self.hasshass, self._reconnect_loop_reconnect_loop(), "mqtt reconnect loop"
687  )
688 
689  @callback
690  def _async_cancel_reconnect(self) -> None:
691  """Cancel the reconnect task."""
692  if self._reconnect_task_reconnect_task:
693  self._reconnect_task_reconnect_task.cancel()
694  self._reconnect_task_reconnect_task = None
695 
696  async def _reconnect_loop(self) -> None:
697  """Reconnect to the MQTT server."""
698  while True:
699  if not self.connectedconnected:
700  try:
701  async with self._connection_lock_connection_lock, self._async_connect_in_executor_async_connect_in_executor():
702  await self.hasshass.async_add_executor_job(self._mqttc_mqttc.reconnect)
703  except OSError as err:
704  _LOGGER.debug(
705  "Error re-connecting to MQTT server due to exception: %s", err
706  )
707 
708  await asyncio.sleep(RECONNECT_INTERVAL_SECONDS)
709 
710  async def async_disconnect(self, disconnect_paho_client: bool = False) -> None:
711  """Stop the MQTT client.
712 
713  We only disconnect grafully if disconnect_paho_client is set, but not
714  when Home Assistant is shut down.
715  """
716 
717  # stop waiting for any pending subscriptions
718  await self._subscribe_debouncer_subscribe_debouncer.async_cleanup()
719  # reset timeout to initial subscribe cooldown
720  self._subscribe_debouncer_subscribe_debouncer.set_timeout(INITIAL_SUBSCRIBE_COOLDOWN)
721  # stop the unsubscribe debouncer
722  await self._unsubscribe_debouncer_unsubscribe_debouncer.async_cleanup()
723  # make sure the unsubscribes are processed
724  await self._async_perform_unsubscribes_async_perform_unsubscribes()
725 
726  # wait for ACKs to be processed
727  if pending := self._pending_operations.values():
728  await asyncio.wait(pending)
729 
730  # stop the MQTT loop
731  async with self._connection_lock_connection_lock:
732  self._should_reconnect_should_reconnect = False
733  self._async_cancel_reconnect_async_cancel_reconnect()
734  # We do not gracefully disconnect to ensure
735  # the broker publishes the will message unless the entry is reloaded
736  if disconnect_paho_client:
737  self._mqttc_mqttc.disconnect()
738 
739  @callback
741  self, subscriptions: set[Subscription]
742  ) -> None:
743  """Restore tracked subscriptions after reload."""
744  for subscription in subscriptions:
745  self._async_track_subscription_async_track_subscription(subscription)
746  self._matching_subscriptions_matching_subscriptions.cache_clear()
747 
748  @callback
749  def _async_track_subscription(self, subscription: Subscription) -> None:
750  """Track a subscription.
751 
752  This method does not send a SUBSCRIBE message to the broker.
753 
754  The caller is responsible clearing the cache of _matching_subscriptions.
755  """
756  if subscription.is_simple_match:
757  self._simple_subscriptions[subscription.topic].add(subscription)
758  else:
759  self._wildcard_subscriptions[subscription] = None
760 
761  @callback
762  def _async_untrack_subscription(self, subscription: Subscription) -> None:
763  """Untrack a subscription.
764 
765  This method does not send an UNSUBSCRIBE message to the broker.
766 
767  The caller is responsible clearing the cache of _matching_subscriptions.
768  """
769  topic = subscription.topic
770  try:
771  if subscription.is_simple_match:
772  simple_subscriptions = self._simple_subscriptions
773  simple_subscriptions[topic].remove(subscription)
774  if not simple_subscriptions[topic]:
775  del simple_subscriptions[topic]
776  else:
777  del self._wildcard_subscriptions[subscription]
778  except (KeyError, ValueError) as exc:
779  raise HomeAssistantError("Can't remove subscription twice") from exc
780 
781  @callback
783  self, subscriptions: Iterable[tuple[str, int]], queue_only: bool = False
784  ) -> None:
785  """Queue requested subscriptions."""
786  for subscription in subscriptions:
787  topic, qos = subscription
788  if (max_qos := self._max_qos[topic]) < qos:
789  self._max_qos[topic] = (max_qos := qos)
790  self._pending_subscriptions_pending_subscriptions[topic] = max_qos
791  # Cancel any pending unsubscribe since we are subscribing now
792  if topic in self._pending_unsubscribes_pending_unsubscribes:
793  self._pending_unsubscribes_pending_unsubscribes.remove(topic)
794  if queue_only:
795  return
796  self._subscribe_debouncer_subscribe_debouncer.async_schedule()
797 
799  self,
800  msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any, None] | None],
801  msg: ReceiveMessage,
802  ) -> str:
803  """Return a string with the exception message."""
804  # if msg_callback is a partial we return the name of the first argument
805  if isinstance(msg_callback, partial):
806  call_back_name = getattr(msg_callback.args[0], "__name__")
807  else:
808  call_back_name = getattr(msg_callback, "__name__")
809  return (
810  f"Exception in {call_back_name} when handling msg on "
811  f"'{msg.topic}': '{msg.payload}'" # type: ignore[str-bytes-safe]
812  )
813 
814  @callback
816  self,
817  topic: str,
818  msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any, None] | None],
819  qos: int,
820  encoding: str | None = None,
821  job_type: HassJobType | None = None,
822  ) -> Callable[[], None]:
823  """Set up a subscription to a topic with the provided qos."""
824  if not isinstance(topic, str):
825  raise HomeAssistantError("Topic needs to be a string!")
826 
827  if job_type is None:
828  job_type = get_hassjob_callable_job_type(msg_callback)
829  if job_type is not HassJobType.Callback:
830  # Only wrap the callback with catch_log_exception
831  # if it is not a simple callback since we catch
832  # exceptions for simple callbacks inline for
833  # performance reasons.
834  msg_callback = catch_log_exception(
835  msg_callback, partial(self._exception_message_exception_message, msg_callback)
836  )
837 
838  job = HassJob(msg_callback, job_type=job_type)
839  is_simple_match = not ("+" in topic or "#" in topic)
840  matcher = None if is_simple_match else _matcher_for_topic(topic)
841 
842  subscription = Subscription(topic, is_simple_match, matcher, job, qos, encoding)
843  self._async_track_subscription_async_track_subscription(subscription)
844  self._matching_subscriptions_matching_subscriptions.cache_clear()
845 
846  # Only subscribe if currently connected.
847  if self.connectedconnected:
848  self._async_queue_subscriptions_async_queue_subscriptions(((topic, qos),))
849 
850  return partial(self._async_remove_async_remove, subscription)
851 
852  @callback
853  def _async_remove(self, subscription: Subscription) -> None:
854  """Remove subscription."""
855  self._async_untrack_subscription_async_untrack_subscription(subscription)
856  self._matching_subscriptions_matching_subscriptions.cache_clear()
857  if subscription in self._retained_topics:
858  del self._retained_topics[subscription]
859  # Only unsubscribe if currently connected
860  if self.connectedconnected:
861  self._async_unsubscribe_async_unsubscribe(subscription.topic)
862 
863  @callback
864  def _async_unsubscribe(self, topic: str) -> None:
865  """Unsubscribe from a topic."""
866  if self._is_active_subscription_is_active_subscription(topic):
867  if self._max_qos[topic] == 0:
868  return
869  subs = self._matching_subscriptions_matching_subscriptions(topic)
870  self._max_qos[topic] = max(sub.qos for sub in subs)
871  # Other subscriptions on topic remaining - don't unsubscribe.
872  return
873  if topic in self._max_qos:
874  del self._max_qos[topic]
875  if topic in self._pending_subscriptions_pending_subscriptions:
876  # Avoid any pending subscription to be executed
877  del self._pending_subscriptions_pending_subscriptions[topic]
878 
879  self._pending_unsubscribes_pending_unsubscribes.add(topic)
880  self._unsubscribe_debouncer_unsubscribe_debouncer.async_schedule()
881 
882  async def _async_perform_subscriptions(self) -> None:
883  """Perform MQTT client subscriptions."""
884  # Section 3.3.1.3 in the specification:
885  # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
886  # When sending a PUBLISH Packet to a Client the Server MUST
887  # set the RETAIN flag to 1 if a message is sent as a result of a
888  # new subscription being made by a Client [MQTT-3.3.1-8].
889  # It MUST set the RETAIN flag to 0 when a PUBLISH Packet is sent to
890  # a Client because it matches an established subscription regardless
891  # of how the flag was set in the message it received [MQTT-3.3.1-9].
892  #
893  # Since we do not know if a published value is retained we need to
894  # (re)subscribe, to ensure retained messages are replayed
895 
896  if not self._pending_subscriptions_pending_subscriptions:
897  return
898 
899  # Split out the wildcard subscriptions, we subscribe to them one by one
900  pending_subscriptions: dict[str, int] = self._pending_subscriptions_pending_subscriptions
901  pending_wildcard_subscriptions = {
902  subscription.topic: pending_subscriptions.pop(subscription.topic)
903  for subscription in self._wildcard_subscriptions
904  if subscription.topic in pending_subscriptions
905  }
906 
907  self._pending_subscriptions_pending_subscriptions = {}
908 
909  debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
910 
911  for chunk in chain(
913  pending_wildcard_subscriptions.items(), MAX_WILDCARD_SUBSCRIBES_PER_CALL
914  ),
915  chunked_or_all(pending_subscriptions.items(), MAX_SUBSCRIBES_PER_CALL),
916  ):
917  chunk_list = list(chunk)
918  if not chunk_list:
919  continue
920 
921  result, mid = self._mqttc_mqttc.subscribe(chunk_list)
922 
923  if debug_enabled:
924  _LOGGER.debug(
925  "Subscribing with mid: %s to topics with qos: %s", mid, chunk_list
926  )
927  self._last_subscribe_last_subscribe = time.monotonic()
928 
929  await self._async_wait_for_mid_or_raise_async_wait_for_mid_or_raise(mid, result)
930 
931  async def _async_perform_unsubscribes(self) -> None:
932  """Perform pending MQTT client unsubscribes."""
933  if not self._pending_unsubscribes_pending_unsubscribes:
934  return
935 
936  topics = list(self._pending_unsubscribes_pending_unsubscribes)
937  self._pending_unsubscribes_pending_unsubscribes = set()
938  debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
939 
940  for chunk in chunked_or_all(topics, MAX_UNSUBSCRIBES_PER_CALL):
941  chunk_list = list(chunk)
942 
943  result, mid = self._mqttc_mqttc.unsubscribe(chunk_list)
944  if debug_enabled:
945  _LOGGER.debug(
946  "Unsubscribing with mid: %s to topics: %s", mid, chunk_list
947  )
948 
949  await self._async_wait_for_mid_or_raise_async_wait_for_mid_or_raise(mid, result)
950 
952  self, birth_message: PublishMessage
953  ) -> None:
954  """Resubscribe to all topics and publish birth message."""
955  self._async_queue_resubscribe_async_queue_resubscribe()
956  self._subscribe_debouncer_subscribe_debouncer.async_schedule()
957  await self._ha_started_ha_started.wait() # Wait for Home Assistant to start
958  await self._discovery_cooldown_discovery_cooldown() # Wait for MQTT discovery to cool down
959  # Update subscribe cooldown period to a shorter time
960  # and make sure we flush the debouncer
961  await self._subscribe_debouncer_subscribe_debouncer.async_execute()
962  self._subscribe_debouncer_subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
963  await self.async_publishasync_publish(
964  topic=birth_message.topic,
965  payload=birth_message.payload,
966  qos=birth_message.qos,
967  retain=birth_message.retain,
968  )
969  _LOGGER.info("MQTT client initialized, birth message sent")
970 
971  @callback
973  self,
974  _mqttc: mqtt.Client,
975  _userdata: None,
976  _flags: dict[str, int],
977  result_code: int,
978  properties: mqtt.Properties | None = None,
979  ) -> None:
980  """On connect callback.
981 
982  Resubscribe to all topics we were subscribed to and publish birth
983  message.
984  """
985  # pylint: disable-next=import-outside-toplevel
986  import paho.mqtt.client as mqtt
987 
988  if result_code != mqtt.CONNACK_ACCEPTED:
989  if result_code in (
990  mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD,
991  mqtt.CONNACK_REFUSED_NOT_AUTHORIZED,
992  ):
993  self._should_reconnect_should_reconnect = False
994  self.hasshass.async_create_task(self.async_disconnectasync_disconnect())
995  self.config_entryconfig_entry.async_start_reauth(self.hasshass)
996  _LOGGER.error(
997  "Unable to connect to the MQTT broker: %s",
998  mqtt.connack_string(result_code),
999  )
1000  self._async_connection_result_async_connection_result(False)
1001  return
1002 
1003  self.connectedconnected = True
1004  async_dispatcher_send(self.hasshass, MQTT_CONNECTION_STATE, True)
1005  _LOGGER.debug(
1006  "Connected to MQTT server %s:%s (%s)",
1007  self.confconf[CONF_BROKER],
1008  self.confconf.get(CONF_PORT, DEFAULT_PORT),
1009  result_code,
1010  )
1011 
1012  birth: dict[str, Any]
1013  if birth := self.confconf.get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
1014  birth_message = PublishMessage(**birth)
1015  self.config_entryconfig_entry.async_create_background_task(
1016  self.hasshass,
1017  self._async_resubscribe_and_publish_birth_message_async_resubscribe_and_publish_birth_message(birth_message),
1018  name="mqtt re-subscribe and birth",
1019  )
1020  else:
1021  # Update subscribe cooldown period to a shorter time
1022  self._async_queue_resubscribe_async_queue_resubscribe()
1023  self._subscribe_debouncer_subscribe_debouncer.async_schedule()
1024 
1025  self._async_connection_result_async_connection_result(True)
1026 
1027  @callback
1028  def _async_queue_resubscribe(self) -> None:
1029  """Queue subscriptions on reconnect.
1030 
1031  self._async_perform_subscriptions must be called
1032  after this method to actually subscribe.
1033  """
1034  self._max_qos.clear()
1035  self._retained_topics.clear()
1036  # Group subscriptions to only re-subscribe once for each topic.
1037  keyfunc = attrgetter("topic")
1038  self._async_queue_subscriptions_async_queue_subscriptions(
1039  [
1040  # Re-subscribe with the highest requested qos
1041  (topic, max(subscription.qos for subscription in subs))
1042  for topic, subs in groupby(
1043  sorted(self.subscriptionssubscriptions, key=keyfunc), keyfunc
1044  )
1045  ],
1046  queue_only=True,
1047  )
1048 
1049  @lru_cache(None) # pylint: disable=method-cache-max-size-none
1050  def _matching_subscriptions(self, topic: str) -> list[Subscription]:
1051  subscriptions: list[Subscription] = []
1052  if topic in self._simple_subscriptions:
1053  subscriptions.extend(self._simple_subscriptions[topic])
1054  subscriptions.extend(
1055  subscription
1056  for subscription in self._wildcard_subscriptions
1057  # mypy doesn't know that complex_matcher is always set when
1058  # is_simple_match is False
1059  if subscription.complex_matcher(topic) # type: ignore[misc]
1060  )
1061  return subscriptions
1062 
1063  @callback
1065  self, _mqttc: mqtt.Client, _userdata: None, msg: mqtt.MQTTMessage
1066  ) -> None:
1067  try:
1068  # msg.topic is a property that decodes the topic to a string
1069  # every time it is accessed. Save the result to avoid
1070  # decoding the same topic multiple times.
1071  topic = msg.topic
1072  except UnicodeDecodeError:
1073  bare_topic: bytes = getattr(msg, "_topic")
1074  _LOGGER.warning(
1075  "Skipping received%s message on invalid topic %s (qos=%s): %s",
1076  " retained" if msg.retain else "",
1077  bare_topic,
1078  msg.qos,
1079  msg.payload[0:8192],
1080  )
1081  return
1082  _LOGGER.debug(
1083  "Received%s message on %s (qos=%s): %s",
1084  " retained" if msg.retain else "",
1085  topic,
1086  msg.qos,
1087  msg.payload[0:8192],
1088  )
1089  subscriptions = self._matching_subscriptions_matching_subscriptions(topic)
1090  msg_cache_by_subscription_topic: dict[str, ReceiveMessage] = {}
1091 
1092  for subscription in subscriptions:
1093  if msg.retain:
1094  retained_topics = self._retained_topics[subscription]
1095  # Skip if the subscription already received a retained message
1096  if topic in retained_topics:
1097  continue
1098  # Remember the subscription had an initial retained message
1099  self._retained_topics[subscription].add(topic)
1100 
1101  payload: SubscribePayloadType = msg.payload
1102  if subscription.encoding is not None:
1103  try:
1104  payload = msg.payload.decode(subscription.encoding)
1105  except (AttributeError, UnicodeDecodeError):
1106  _LOGGER.warning(
1107  "Can't decode payload %s on %s with encoding %s (for %s)",
1108  msg.payload[0:8192],
1109  topic,
1110  subscription.encoding,
1111  subscription.job,
1112  )
1113  continue
1114  subscription_topic = subscription.topic
1115  if subscription_topic not in msg_cache_by_subscription_topic:
1116  # Only make one copy of the message
1117  # per topic so we avoid storing a separate
1118  # dataclass in memory for each subscriber
1119  # to the same topic for retained messages
1120  receive_msg = ReceiveMessage(
1121  topic,
1122  payload,
1123  msg.qos,
1124  msg.retain,
1125  subscription_topic,
1126  msg.timestamp,
1127  )
1128  msg_cache_by_subscription_topic[subscription_topic] = receive_msg
1129  else:
1130  receive_msg = msg_cache_by_subscription_topic[subscription_topic]
1131  job = subscription.job
1132  if job.job_type is HassJobType.Callback:
1133  # We do not wrap Callback jobs in catch_log_exception since
1134  # its expensive and we have to do it 2x for every entity
1135  try:
1136  job.target(receive_msg)
1137  except Exception: # noqa: BLE001
1138  log_exception(
1139  partial(self._exception_message_exception_message, job.target, receive_msg)
1140  )
1141  else:
1142  self.hasshass.async_run_hass_job(job, receive_msg)
1143  self._mqtt_data_mqtt_data.state_write_requests.process_write_state_requests(msg)
1144 
1145  @callback
1147  self,
1148  _mqttc: mqtt.Client,
1149  _userdata: None,
1150  mid: int,
1151  _granted_qos_reason: tuple[int, ...] | mqtt.ReasonCodes | None = None,
1152  _properties_reason: mqtt.ReasonCodes | None = None,
1153  ) -> None:
1154  """Publish / Subscribe / Unsubscribe callback."""
1155  # The callback signature for on_unsubscribe is different from on_subscribe
1156  # see https://github.com/eclipse/paho.mqtt.python/issues/687
1157  # properties and reason codes are not used in Home Assistant
1158  future = self._async_get_mid_future_async_get_mid_future(mid)
1159  if future.done() and (future.cancelled() or future.exception()):
1160  # Timed out or cancelled
1161  return
1162  future.set_result(None)
1163 
1164  @callback
1165  def _async_get_mid_future(self, mid: int) -> asyncio.Future[None]:
1166  """Get the future for a mid."""
1167  if future := self._pending_operations.get(mid):
1168  return future
1169  future = self.hasshass.loop.create_future()
1170  self._pending_operations[mid] = future
1171  return future
1172 
1173  @callback
1175  self,
1176  _mqttc: mqtt.Client,
1177  _userdata: None,
1178  result_code: int,
1179  properties: mqtt.Properties | None = None,
1180  ) -> None:
1181  """Disconnected callback."""
1182  self._async_on_disconnect_async_on_disconnect(result_code)
1183 
1184  @callback
1185  def _async_on_disconnect(self, result_code: int) -> None:
1186  if not self.connectedconnected:
1187  # This function is re-entrant and may be called multiple times
1188  # when there is a broken pipe error.
1189  return
1190  # If disconnect is called before the connect
1191  # result is set make sure the first connection result is set
1192  self._async_connection_result_async_connection_result(False)
1193  self.connectedconnected = False
1194  async_dispatcher_send(self.hasshass, MQTT_CONNECTION_STATE, False)
1195  _LOGGER.log(
1196  logging.INFO if result_code == 0 else logging.DEBUG,
1197  "Disconnected from MQTT server %s:%s (%s)",
1198  self.confconf[CONF_BROKER],
1199  self.confconf.get(CONF_PORT, DEFAULT_PORT),
1200  result_code,
1201  )
1202 
1203  @callback
1204  def _async_timeout_mid(self, future: asyncio.Future[None]) -> None:
1205  """Timeout waiting for a mid."""
1206  if not future.done():
1207  future.set_exception(asyncio.TimeoutError)
1208 
1209  async def _async_wait_for_mid_or_raise(self, mid: int, result_code: int) -> None:
1210  """Wait for ACK from broker or raise on error."""
1211  if result_code != 0:
1212  # pylint: disable-next=import-outside-toplevel
1213  import paho.mqtt.client as mqtt
1214 
1215  raise HomeAssistantError(
1216  f"Error talking to MQTT: {mqtt.error_string(result_code)}"
1217  )
1218 
1219  # Create the mid event if not created, either _mqtt_handle_mid or
1220  # _async_wait_for_mid_or_raise may be executed first.
1221  future = self._async_get_mid_future_async_get_mid_future(mid)
1222  loop = self.hasshass.loop
1223  timer_handle = loop.call_later(TIMEOUT_ACK, self._async_timeout_mid_async_timeout_mid, future)
1224  try:
1225  await future
1226  except TimeoutError:
1227  _LOGGER.warning(
1228  "No ACK from MQTT server in %s seconds (mid: %s)", TIMEOUT_ACK, mid
1229  )
1230  finally:
1231  timer_handle.cancel()
1232  del self._pending_operations[mid]
1233 
1234  async def _discovery_cooldown(self) -> None:
1235  """Wait until all discovery and subscriptions are processed."""
1236  now = time.monotonic()
1237  # Reset discovery and subscribe cooldowns
1238  self._mqtt_data_mqtt_data.last_discovery = now
1239  self._last_subscribe_last_subscribe = now
1240 
1241  last_discovery = self._mqtt_data_mqtt_data.last_discovery
1242  last_subscribe = now if self._pending_subscriptions_pending_subscriptions else self._last_subscribe_last_subscribe
1243  wait_until = max(last_discovery, last_subscribe) + DISCOVERY_COOLDOWN
1244  while now < wait_until:
1245  await asyncio.sleep(wait_until - now)
1246  now = time.monotonic()
1247  last_discovery = self._mqtt_data_mqtt_data.last_discovery
1248  last_subscribe = (
1249  now if self._pending_subscriptions_pending_subscriptions else self._last_subscribe_last_subscribe
1250  )
1251  wait_until = max(last_discovery, last_subscribe) + DISCOVERY_COOLDOWN
1252 
1253 
1254 def _matcher_for_topic(subscription: str) -> Callable[[str], bool]:
1255  # pylint: disable-next=import-outside-toplevel
1256  from paho.mqtt.matcher import MQTTMatcher
1257 
1258  matcher = MQTTMatcher()
1259  matcher[subscription] = True
1260 
1261  return lambda topic: next(matcher.iter_match(topic), False)
None _async_mqtt_on_callback(self, mqtt.Client _mqttc, None _userdata, int mid, tuple[int,...]|mqtt.ReasonCodes|None _granted_qos_reason=None, mqtt.ReasonCodes|None _properties_reason=None)
Definition: client.py:1153
None _async_mqtt_on_disconnect(self, mqtt.Client _mqttc, None _userdata, int result_code, mqtt.Properties|None properties=None)
Definition: client.py:1180
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, ConfigType conf)
Definition: client.py:369
None _on_socket_open(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:555
str _exception_message(self, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, ReceiveMessage msg)
Definition: client.py:802
AsyncGenerator[None] _async_connect_in_executor(self)
Definition: client.py:446
None _async_on_socket_register_write(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:610
None _async_mqtt_on_connect(self, mqtt.Client _mqttc, None _userdata, dict[str, int] _flags, int result_code, mqtt.Properties|None properties=None)
Definition: client.py:979
None async_connect(self, asyncio.Future[bool] client_available)
Definition: client.py:648
None _async_on_socket_open(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:564
asyncio.Future[None] _async_get_mid_future(self, int mid)
Definition: client.py:1165
None _async_on_socket_unregister_write(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:620
None _async_reader_callback(self, mqtt.Client client)
Definition: client.py:499
None _async_untrack_subscription(self, Subscription subscription)
Definition: client.py:762
None _async_ha_stop(self, Event _event)
Definition: client.py:420
None _async_resubscribe_and_publish_birth_message(self, PublishMessage birth_message)
Definition: client.py:953
None _async_on_socket_close(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:580
list[Subscription] _matching_subscriptions(self, str topic)
Definition: client.py:1050
None _on_socket_register_write(self, mqtt.Client client, Any userdata, SocketType sock)
Definition: client.py:601
None _async_on_disconnect(self, int result_code)
Definition: client.py:1185
None _async_track_subscription(self, Subscription subscription)
Definition: client.py:749
None _async_mqtt_on_message(self, mqtt.Client _mqttc, None _userdata, mqtt.MQTTMessage msg)
Definition: client.py:1066
Callable[[], None] async_subscribe(self, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos, str|None encoding=None, HassJobType|None job_type=None)
Definition: client.py:822
None _async_ha_started(self, HomeAssistant _hass)
Definition: client.py:416
None _async_connection_result(self, bool connected)
Definition: client.py:677
None async_disconnect(self, bool disconnect_paho_client=False)
Definition: client.py:710
None _async_timeout_mid(self, asyncio.Future[None] future)
Definition: client.py:1204
None _async_queue_subscriptions(self, Iterable[tuple[str, int]] subscriptions, bool queue_only=False)
Definition: client.py:784
set[Subscription] subscriptions(self)
Definition: client.py:433
None _async_remove(self, Subscription subscription)
Definition: client.py:853
bool _is_active_subscription(self, str topic)
Definition: client.py:627
None async_start(self, MqttData mqtt_data)
Definition: client.py:427
None async_restore_tracked_subscriptions(self, set[Subscription] subscriptions)
Definition: client.py:742
None _async_writer_callback(self, mqtt.Client client)
Definition: client.py:594
None _increase_socket_buffer_size(self, SocketType sock)
Definition: client.py:522
None _async_wait_for_mid_or_raise(self, int mid, int result_code)
Definition: client.py:1209
None _async_unsubscribe(self, str topic)
Definition: client.py:864
None async_publish(self, str topic, PublishPayloadType payload, int qos, bool retain)
Definition: client.py:635
None __init__(self, ConfigType config)
Definition: client.py:281
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
CALLBACK_TYPE async_subscribe(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:194
None async_publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:144
Callable[[str], bool] _matcher_for_topic(str subscription)
Definition: client.py:1254
Callable[[], None] subscribe(HomeAssistant hass, str topic, MessageCallbackType msg_callback, int qos=DEFAULT_QOS, str encoding="utf-8")
Definition: client.py:247
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
Definition: client.py:210
None publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:132
str|None get_file_path(str option, str|None default=None)
Definition: util.py:392
bool|None mqtt_config_entry_enabled(HomeAssistant hass)
Definition: util.py:192
HassJobType get_hassjob_callable_job_type(Callable[..., Any] target)
Definition: core.py:387
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
None async_cleanup(HomeAssistant hass, DeviceRegistry dev_reg, entity_registry.EntityRegistry ent_reg)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
ModuleType async_import_module(HomeAssistant hass, str name)
Definition: importlib.py:30
CALLBACK_TYPE async_at_started(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)
Definition: start.py:80
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)
Definition: setup.py:691
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)
Definition: collection.py:25