1 """Support for MQTT message handling."""
3 from __future__
import annotations
6 from collections
import defaultdict
7 from collections.abc
import AsyncGenerator, Callable, Coroutine, Iterable
9 from dataclasses
import dataclass
10 from functools
import lru_cache, partial
11 from itertools
import chain, groupby
13 from operator
import attrgetter
17 from typing
import TYPE_CHECKING, Any
29 EVENT_HOMEASSISTANT_STOP,
38 get_hassjob_callable_job_type,
73 MQTT_CONNECTION_STATE,
86 from .util
import EnsureJobAfterCooldown, get_file_path, mqtt_config_entry_enabled
91 import paho.mqtt.client
as mqtt
93 from .async_client
import AsyncMQTTClient
95 _LOGGER = logging.getLogger(__name__)
97 MIN_BUFFER_SIZE = 131072
98 PREFERRED_BUFFER_SIZE = 8 * 1024 * 1024
100 DISCOVERY_COOLDOWN = 5
108 INITIAL_SUBSCRIBE_COOLDOWN = 0.5
109 SUBSCRIBE_COOLDOWN = 0.1
110 UNSUBSCRIBE_COOLDOWN = 0.1
112 RECONNECT_INTERVAL_SECONDS = 10
114 MAX_WILDCARD_SUBSCRIBES_PER_CALL = 1
115 MAX_SUBSCRIBES_PER_CALL = 500
116 MAX_UNSUBSCRIBES_PER_CALL = 500
118 MAX_PACKETS_TO_READ = 500
120 type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any
122 type SubscribePayloadType = str | bytes
128 payload: PublishPayloadType,
130 retain: bool |
None =
False,
131 encoding: str |
None = DEFAULT_ENCODING,
133 """Publish message to a MQTT topic."""
134 hass.create_task(
async_publish(hass, topic, payload, qos, retain, encoding))
140 payload: PublishPayloadType,
142 retain: bool |
None =
False,
143 encoding: str |
None = DEFAULT_ENCODING,
145 """Publish message to a MQTT topic."""
148 f
"Cannot publish to topic '{topic}', MQTT is not enabled",
149 translation_key=
"mqtt_not_setup_cannot_publish",
150 translation_domain=DOMAIN,
151 translation_placeholders={
"topic": topic},
153 mqtt_data = hass.data[DATA_MQTT]
154 outgoing_payload = payload
155 if not isinstance(payload, bytes)
and payload
is not None:
159 "Can't pass-through payload for publishing %s on %s with no"
160 " encoding set, need 'bytes' got %s"
167 outgoing_payload =
str(payload)
168 if encoding != DEFAULT_ENCODING:
172 outgoing_payload = outgoing_payload.encode(encoding)
173 except (AttributeError, LookupError, UnicodeEncodeError):
175 "Can't encode payload for publishing %s on %s with encoding %s",
182 await mqtt_data.client.async_publish(
183 topic, outgoing_payload, qos
or 0, retain
or False
191 msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any,
None] |
None],
192 qos: int = DEFAULT_QOS,
193 encoding: str |
None = DEFAULT_ENCODING,
195 """Subscribe to an MQTT topic.
197 Call the return value to unsubscribe.
206 msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any,
None] |
None],
207 qos: int = DEFAULT_QOS,
208 encoding: str |
None = DEFAULT_ENCODING,
209 job_type: HassJobType |
None =
None,
211 """Subscribe to an MQTT topic.
213 This function is internal to the MQTT integration
214 and may change at any time. It should not be considered
217 Call the return value to unsubscribe.
220 mqtt_data = hass.data[DATA_MQTT]
221 except KeyError
as exc:
223 f
"Cannot subscribe to topic '{topic}', "
224 "make sure MQTT is set up correctly",
225 translation_key=
"mqtt_not_setup_cannot_subscribe",
226 translation_domain=DOMAIN,
227 translation_placeholders={
"topic": topic},
229 client = mqtt_data.client
232 f
"Cannot subscribe to topic '{topic}', MQTT is not enabled",
233 translation_key=
"mqtt_not_setup_cannot_subscribe",
234 translation_domain=DOMAIN,
235 translation_placeholders={
"topic": topic},
237 return client.async_subscribe(topic, msg_callback, qos, encoding, job_type)
244 msg_callback: MessageCallbackType,
245 qos: int = DEFAULT_QOS,
246 encoding: str =
"utf-8",
247 ) -> Callable[[],
None]:
248 """Subscribe to an MQTT topic."""
249 async_remove = asyncio.run_coroutine_threadsafe(
254 """Remove listener convert."""
259 hass.loop.call_soon_threadsafe(async_remove)
264 @dataclass(slots=True, frozen=True)
266 """Class to hold data about an active subscription."""
269 is_simple_match: bool
270 complex_matcher: Callable[[str], bool] |
None
271 job: HassJob[[ReceiveMessage], Coroutine[Any, Any,
None] |
None]
273 encoding: str |
None =
"utf-8"
277 """Helper class to setup the paho mqtt client from config."""
279 _client: AsyncMQTTClient
282 """Initialize the MQTT client setup helper.
284 self.setup must be run in an executor job.
290 """Set up the MQTT client.
292 The setup of the MQTT client should be run in an executor job,
293 because it accesses files, so it does IO.
297 import paho.mqtt.client
as mqtt
300 from .async_client
import AsyncMQTTClient
303 if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31:
305 elif protocol == PROTOCOL_5:
308 proto = mqtt.MQTTv311
310 if (client_id := config.get(CONF_CLIENT_ID))
is None:
313 client_id = mqtt.base62(uuid.uuid4().int, padding=22)
314 transport: str = config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
319 reconnect_on_failure=
False,
324 self.
_client_client.enable_logger()
326 username: str |
None = config.get(CONF_USERNAME)
327 password: str |
None = config.get(CONF_PASSWORD)
328 if username
is not None:
329 self.
_client_client.username_pw_set(username, password)
332 certificate :=
get_file_path(CONF_CERTIFICATE, config.get(CONF_CERTIFICATE))
334 certificate = certifi.where()
336 client_key =
get_file_path(CONF_CLIENT_KEY, config.get(CONF_CLIENT_KEY))
337 client_cert =
get_file_path(CONF_CLIENT_CERT, config.get(CONF_CLIENT_CERT))
338 tls_insecure = config.get(CONF_TLS_INSECURE)
339 if transport == TRANSPORT_WEBSOCKETS:
340 ws_path: str = config.get(CONF_WS_PATH, DEFAULT_WS_PATH)
341 ws_headers: dict[str, str] = config.get(CONF_WS_HEADERS, DEFAULT_WS_HEADERS)
342 self.
_client_client.ws_set_options(ws_path, ws_headers)
343 if certificate
is not None:
346 certfile=client_cert,
348 tls_version=ssl.PROTOCOL_TLS_CLIENT,
351 if tls_insecure
is not None:
352 self.
_client_client.tls_insecure_set(tls_insecure)
356 """Return the paho MQTT client."""
361 """Home Assistant MQTT client."""
363 _mqttc: AsyncMQTTClient
364 _last_subscribe: float
368 self, hass: HomeAssistant, config_entry: ConfigEntry, conf: ConfigType
370 """Initialize Home Assistant MQTT client."""
376 self._simple_subscriptions: defaultdict[str, set[Subscription]] = defaultdict(
381 self._wildcard_subscriptions: dict[Subscription,
None] = {}
386 self._retained_topics: defaultdict[Subscription, set[str]] = defaultdict(set)
389 self._cleanup_on_unload: list[Callable[[],
None]] = []
392 self._pending_operations: dict[int, asyncio.Future[
None]] = {}
396 self.
_misc_timer_misc_timer: asyncio.TimerHandle |
None =
None
401 self._max_qos: defaultdict[str, int] = defaultdict(int)
407 self._cleanup_on_unload.extend(
410 hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.
_async_ha_stop_async_ha_stop),
413 self._socket_buffersize: int |
None =
None
417 """Handle HA started."""
421 """Handle HA stop."""
428 """Start Home Assistant MQTT client."""
434 """Return the tracked subscriptions."""
436 *chain.from_iterable(self._simple_subscriptions.values()),
437 *self._wildcard_subscriptions,
441 """Clean up listeners."""
442 while self._cleanup_on_unload:
443 self._cleanup_on_unload.pop()()
445 @contextlib.asynccontextmanager
462 """Initialize paho client."""
465 self.
hasshass,
"homeassistant.components.mqtt.async_client"
469 await self.
hasshass.async_add_executor_job(mqttc_setup.setup)
470 mqttc = mqttc_setup.client
485 mqttc.suppress_exceptions =
True
487 if will := self.
confconf.
get(CONF_WILL_MESSAGE, DEFAULT_WILL):
490 topic=will_message.topic,
491 payload=will_message.payload,
492 qos=will_message.qos,
493 retain=will_message.retain,
500 """Handle reading data from the socket."""
501 if (status := client.loop_read(MAX_PACKETS_TO_READ)) != 0:
506 """Start the misc periodic."""
507 assert self.
_misc_timer_misc_timer
is None,
"Misc periodic already started"
508 _LOGGER.debug(
"%s: Starting client misc loop", self.
config_entryconfig_entry.title)
510 import paho.mqtt.client
as mqtt
515 def _async_misc() -> None:
516 """Start the MQTT client misc loop."""
517 if self.
_mqttc_mqttc.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
523 """Increase the socket buffer size."""
524 if not hasattr(sock,
"setsockopt")
and hasattr(sock,
"_socket"):
532 new_buffer_size = PREFERRED_BUFFER_SIZE
537 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, new_buffer_size)
538 except OSError
as err:
539 if new_buffer_size <= MIN_BUFFER_SIZE:
541 "Unable to increase the socket buffer size to %s; "
542 "The connection may be unstable if the MQTT broker "
543 "sends data at volume or a large amount of subscriptions "
544 "need to be processed: %s",
549 new_buffer_size //= 2
554 self, client: mqtt.Client, userdata: Any, sock: SocketType
556 """Handle socket open."""
557 self.
looploop.call_soon_threadsafe(
563 self, client: mqtt.Client, userdata: Any, sock: SocketType
565 """Handle socket open."""
566 fileno = sock.fileno()
567 _LOGGER.debug(
"%s: connection opened %s", self.
config_entryconfig_entry.title, fileno)
579 self, client: mqtt.Client, userdata: Any, sock: SocketType
581 """Handle socket close."""
582 fileno = sock.fileno()
583 _LOGGER.debug(
"%s: connection closed %s", self.
config_entryconfig_entry.title, fileno)
588 self.
looploop.remove_reader(sock)
595 """Handle writing data to the socket."""
596 if (status := client.loop_write()) != 0:
600 self, client: mqtt.Client, userdata: Any, sock: SocketType
602 """Register the socket for writing."""
603 self.
looploop.call_soon_threadsafe(
609 self, client: mqtt.Client, userdata: Any, sock: SocketType
611 """Register the socket for writing."""
612 fileno = sock.fileno()
613 _LOGGER.debug(
"%s: register write %s", self.
config_entryconfig_entry.title, fileno)
619 self, client: mqtt.Client, userdata: Any, sock: SocketType
621 """Unregister the socket for writing."""
622 fileno = sock.fileno()
623 _LOGGER.debug(
"%s: unregister write %s", self.
config_entryconfig_entry.title, fileno)
625 self.
looploop.remove_writer(sock)
628 """Check if a topic has an active subscription."""
629 return topic
in self._simple_subscriptions
or any(
630 other.topic == topic
for other
in self._wildcard_subscriptions
634 self, topic: str, payload: PublishPayloadType, qos: int, retain: bool
636 """Publish a MQTT message."""
637 msg_info = self.
_mqttc_mqttc.
publish(topic, payload, qos, retain)
639 "Transmitting%s message on %s: '%s', mid: %s, qos: %s",
640 " retained" if retain
else "",
648 async
def async_connect(self, client_available: asyncio.Future[bool]) ->
None:
649 """Connect to the host. Does not process messages yet."""
651 import paho.mqtt.client
as mqtt
653 result: int |
None =
None
658 result = await self.
hasshass.async_add_executor_job(
659 self.
_mqttc_mqttc.connect,
660 self.
confconf[CONF_BROKER],
661 self.
confconf.
get(CONF_PORT, DEFAULT_PORT),
662 self.
confconf.
get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE),
664 except OSError
as err:
665 _LOGGER.error(
"Failed to connect to MQTT server due to exception: %s", err)
668 if result
is not None and result != 0:
669 if result
is not None:
671 "Failed to connect to MQTT server: %s",
672 mqtt.error_string(result),
678 """Handle a connection result."""
691 """Cancel the reconnect task."""
697 """Reconnect to the MQTT server."""
702 await self.
hasshass.async_add_executor_job(self.
_mqttc_mqttc.reconnect)
703 except OSError
as err:
705 "Error re-connecting to MQTT server due to exception: %s", err
708 await asyncio.sleep(RECONNECT_INTERVAL_SECONDS)
711 """Stop the MQTT client.
713 We only disconnect grafully if disconnect_paho_client is set, but not
714 when Home Assistant is shut down.
727 if pending := self._pending_operations.values():
728 await asyncio.wait(pending)
736 if disconnect_paho_client:
737 self.
_mqttc_mqttc.disconnect()
741 self, subscriptions: set[Subscription]
743 """Restore tracked subscriptions after reload."""
744 for subscription
in subscriptions:
750 """Track a subscription.
752 This method does not send a SUBSCRIBE message to the broker.
754 The caller is responsible clearing the cache of _matching_subscriptions.
756 if subscription.is_simple_match:
757 self._simple_subscriptions[subscription.topic].
add(subscription)
759 self._wildcard_subscriptions[subscription] =
None
763 """Untrack a subscription.
765 This method does not send an UNSUBSCRIBE message to the broker.
767 The caller is responsible clearing the cache of _matching_subscriptions.
769 topic = subscription.topic
771 if subscription.is_simple_match:
772 simple_subscriptions = self._simple_subscriptions
773 simple_subscriptions[topic].
remove(subscription)
774 if not simple_subscriptions[topic]:
775 del simple_subscriptions[topic]
777 del self._wildcard_subscriptions[subscription]
778 except (KeyError, ValueError)
as exc:
783 self, subscriptions: Iterable[tuple[str, int]], queue_only: bool =
False
785 """Queue requested subscriptions."""
786 for subscription
in subscriptions:
787 topic, qos = subscription
788 if (max_qos := self._max_qos[topic]) < qos:
789 self._max_qos[topic] = (max_qos := qos)
800 msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any,
None] |
None],
803 """Return a string with the exception message."""
805 if isinstance(msg_callback, partial):
806 call_back_name = getattr(msg_callback.args[0],
"__name__")
808 call_back_name = getattr(msg_callback,
"__name__")
810 f
"Exception in {call_back_name} when handling msg on "
811 f
"'{msg.topic}': '{msg.payload}'"
818 msg_callback: Callable[[ReceiveMessage], Coroutine[Any, Any,
None] |
None],
820 encoding: str |
None =
None,
821 job_type: HassJobType |
None =
None,
822 ) -> Callable[[],
None]:
823 """Set up a subscription to a topic with the provided qos."""
824 if not isinstance(topic, str):
829 if job_type
is not HassJobType.Callback:
834 msg_callback = catch_log_exception(
838 job =
HassJob(msg_callback, job_type=job_type)
839 is_simple_match =
not (
"+" in topic
or "#" in topic)
842 subscription =
Subscription(topic, is_simple_match, matcher, job, qos, encoding)
850 return partial(self.
_async_remove_async_remove, subscription)
854 """Remove subscription."""
857 if subscription
in self._retained_topics:
858 del self._retained_topics[subscription]
865 """Unsubscribe from a topic."""
867 if self._max_qos[topic] == 0:
870 self._max_qos[topic] =
max(sub.qos
for sub
in subs)
873 if topic
in self._max_qos:
874 del self._max_qos[topic]
883 """Perform MQTT client subscriptions."""
901 pending_wildcard_subscriptions = {
902 subscription.topic: pending_subscriptions.pop(subscription.topic)
903 for subscription
in self._wildcard_subscriptions
904 if subscription.topic
in pending_subscriptions
909 debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
913 pending_wildcard_subscriptions.items(), MAX_WILDCARD_SUBSCRIBES_PER_CALL
915 chunked_or_all(pending_subscriptions.items(), MAX_SUBSCRIBES_PER_CALL),
917 chunk_list =
list(chunk)
925 "Subscribing with mid: %s to topics with qos: %s", mid, chunk_list
932 """Perform pending MQTT client unsubscribes."""
938 debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
941 chunk_list =
list(chunk)
943 result, mid = self.
_mqttc_mqttc.unsubscribe(chunk_list)
946 "Unsubscribing with mid: %s to topics: %s", mid, chunk_list
952 self, birth_message: PublishMessage
954 """Resubscribe to all topics and publish birth message."""
964 topic=birth_message.topic,
965 payload=birth_message.payload,
966 qos=birth_message.qos,
967 retain=birth_message.retain,
969 _LOGGER.info(
"MQTT client initialized, birth message sent")
976 _flags: dict[str, int],
978 properties: mqtt.Properties |
None =
None,
980 """On connect callback.
982 Resubscribe to all topics we were subscribed to and publish birth
986 import paho.mqtt.client
as mqtt
988 if result_code != mqtt.CONNACK_ACCEPTED:
990 mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD,
991 mqtt.CONNACK_REFUSED_NOT_AUTHORIZED,
997 "Unable to connect to the MQTT broker: %s",
998 mqtt.connack_string(result_code),
1006 "Connected to MQTT server %s:%s (%s)",
1007 self.
confconf[CONF_BROKER],
1008 self.
confconf.
get(CONF_PORT, DEFAULT_PORT),
1012 birth: dict[str, Any]
1013 if birth := self.
confconf.
get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
1015 self.
config_entryconfig_entry.async_create_background_task(
1018 name=
"mqtt re-subscribe and birth",
1029 """Queue subscriptions on reconnect.
1031 self._async_perform_subscriptions must be called
1032 after this method to actually subscribe.
1034 self._max_qos.clear()
1035 self._retained_topics.clear()
1037 keyfunc = attrgetter(
"topic")
1041 (topic,
max(subscription.qos
for subscription
in subs))
1042 for topic, subs
in groupby(
1043 sorted(self.
subscriptionssubscriptions, key=keyfunc), keyfunc
1051 subscriptions: list[Subscription] = []
1052 if topic
in self._simple_subscriptions:
1053 subscriptions.extend(self._simple_subscriptions[topic])
1054 subscriptions.extend(
1056 for subscription
in self._wildcard_subscriptions
1059 if subscription.complex_matcher(topic)
1061 return subscriptions
1065 self, _mqttc: mqtt.Client, _userdata:
None, msg: mqtt.MQTTMessage
1072 except UnicodeDecodeError:
1073 bare_topic: bytes = getattr(msg,
"_topic")
1075 "Skipping received%s message on invalid topic %s (qos=%s): %s",
1076 " retained" if msg.retain
else "",
1079 msg.payload[0:8192],
1083 "Received%s message on %s (qos=%s): %s",
1084 " retained" if msg.retain
else "",
1087 msg.payload[0:8192],
1090 msg_cache_by_subscription_topic: dict[str, ReceiveMessage] = {}
1092 for subscription
in subscriptions:
1094 retained_topics = self._retained_topics[subscription]
1096 if topic
in retained_topics:
1099 self._retained_topics[subscription].
add(topic)
1101 payload: SubscribePayloadType = msg.payload
1102 if subscription.encoding
is not None:
1104 payload = msg.payload.decode(subscription.encoding)
1105 except (AttributeError, UnicodeDecodeError):
1107 "Can't decode payload %s on %s with encoding %s (for %s)",
1108 msg.payload[0:8192],
1110 subscription.encoding,
1114 subscription_topic = subscription.topic
1115 if subscription_topic
not in msg_cache_by_subscription_topic:
1128 msg_cache_by_subscription_topic[subscription_topic] = receive_msg
1130 receive_msg = msg_cache_by_subscription_topic[subscription_topic]
1131 job = subscription.job
1132 if job.job_type
is HassJobType.Callback:
1136 job.target(receive_msg)
1142 self.
hasshass.async_run_hass_job(job, receive_msg)
1143 self.
_mqtt_data_mqtt_data.state_write_requests.process_write_state_requests(msg)
1148 _mqttc: mqtt.Client,
1151 _granted_qos_reason: tuple[int, ...] | mqtt.ReasonCodes |
None =
None,
1152 _properties_reason: mqtt.ReasonCodes |
None =
None,
1154 """Publish / Subscribe / Unsubscribe callback."""
1159 if future.done()
and (future.cancelled()
or future.exception()):
1162 future.set_result(
None)
1166 """Get the future for a mid."""
1167 if future := self._pending_operations.
get(mid):
1169 future = self.
hasshass.loop.create_future()
1170 self._pending_operations[mid] = future
1176 _mqttc: mqtt.Client,
1179 properties: mqtt.Properties |
None =
None,
1181 """Disconnected callback."""
1196 logging.INFO
if result_code == 0
else logging.DEBUG,
1197 "Disconnected from MQTT server %s:%s (%s)",
1198 self.
confconf[CONF_BROKER],
1199 self.
confconf.
get(CONF_PORT, DEFAULT_PORT),
1205 """Timeout waiting for a mid."""
1206 if not future.done():
1207 future.set_exception(asyncio.TimeoutError)
1210 """Wait for ACK from broker or raise on error."""
1211 if result_code != 0:
1213 import paho.mqtt.client
as mqtt
1216 f
"Error talking to MQTT: {mqtt.error_string(result_code)}"
1222 loop = self.
hasshass.loop
1223 timer_handle = loop.call_later(TIMEOUT_ACK, self.
_async_timeout_mid_async_timeout_mid, future)
1226 except TimeoutError:
1228 "No ACK from MQTT server in %s seconds (mid: %s)", TIMEOUT_ACK, mid
1231 timer_handle.cancel()
1232 del self._pending_operations[mid]
1235 """Wait until all discovery and subscriptions are processed."""
1236 now = time.monotonic()
1238 self.
_mqtt_data_mqtt_data.last_discovery = now
1241 last_discovery = self.
_mqtt_data_mqtt_data.last_discovery
1243 wait_until =
max(last_discovery, last_subscribe) + DISCOVERY_COOLDOWN
1244 while now < wait_until:
1245 await asyncio.sleep(wait_until - now)
1246 now = time.monotonic()
1247 last_discovery = self.
_mqtt_data_mqtt_data.last_discovery
1251 wait_until =
max(last_discovery, last_subscribe) + DISCOVERY_COOLDOWN
1256 from paho.mqtt.matcher
import MQTTMatcher
1258 matcher = MQTTMatcher()
1259 matcher[subscription] =
True
1261 return lambda topic: next(matcher.iter_match(topic),
False)
None _async_mqtt_on_callback(self, mqtt.Client _mqttc, None _userdata, int mid, tuple[int,...]|mqtt.ReasonCodes|None _granted_qos_reason=None, mqtt.ReasonCodes|None _properties_reason=None)
None _async_mqtt_on_disconnect(self, mqtt.Client _mqttc, None _userdata, int result_code, mqtt.Properties|None properties=None)
None async_init_client(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, ConfigType conf)
None _on_socket_open(self, mqtt.Client client, Any userdata, SocketType sock)
str _exception_message(self, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, ReceiveMessage msg)
AsyncGenerator[None] _async_connect_in_executor(self)
None _async_on_socket_register_write(self, mqtt.Client client, Any userdata, SocketType sock)
None _async_mqtt_on_connect(self, mqtt.Client _mqttc, None _userdata, dict[str, int] _flags, int result_code, mqtt.Properties|None properties=None)
None async_connect(self, asyncio.Future[bool] client_available)
None _async_on_socket_open(self, mqtt.Client client, Any userdata, SocketType sock)
None _discovery_cooldown(self)
asyncio.Future[None] _async_get_mid_future(self, int mid)
None _async_on_socket_unregister_write(self, mqtt.Client client, Any userdata, SocketType sock)
None _async_reader_callback(self, mqtt.Client client)
None _async_untrack_subscription(self, Subscription subscription)
None _async_ha_stop(self, Event _event)
None _async_resubscribe_and_publish_birth_message(self, PublishMessage birth_message)
None _async_perform_unsubscribes(self)
None _async_on_socket_close(self, mqtt.Client client, Any userdata, SocketType sock)
list[Subscription] _matching_subscriptions(self, str topic)
None _async_queue_resubscribe(self)
None _on_socket_register_write(self, mqtt.Client client, Any userdata, SocketType sock)
None _async_on_disconnect(self, int result_code)
None _async_track_subscription(self, Subscription subscription)
None _async_mqtt_on_message(self, mqtt.Client _mqttc, None _userdata, mqtt.MQTTMessage msg)
None _async_cancel_reconnect(self)
Callable[[], None] async_subscribe(self, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos, str|None encoding=None, HassJobType|None job_type=None)
None _async_ha_started(self, HomeAssistant _hass)
None _async_perform_subscriptions(self)
None _async_connection_result(self, bool connected)
None async_disconnect(self, bool disconnect_paho_client=False)
None _async_timeout_mid(self, asyncio.Future[None] future)
None _async_queue_subscriptions(self, Iterable[tuple[str, int]] subscriptions, bool queue_only=False)
None _async_start_misc_periodic(self)
set[Subscription] subscriptions(self)
None _async_remove(self, Subscription subscription)
bool _is_active_subscription(self, str topic)
None async_start(self, MqttData mqtt_data)
None async_restore_tracked_subscriptions(self, set[Subscription] subscriptions)
None _async_writer_callback(self, mqtt.Client client)
None _reconnect_loop(self)
None _increase_socket_buffer_size(self, SocketType sock)
None _async_wait_for_mid_or_raise(self, int mid, int result_code)
None _async_unsubscribe(self, str topic)
None async_publish(self, str topic, PublishPayloadType payload, int qos, bool retain)
AsyncMQTTClient client(self)
None __init__(self, ConfigType config)
bool add(self, _T matcher)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
CALLBACK_TYPE async_subscribe(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING)
None async_publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
Callable[[str], bool] _matcher_for_topic(str subscription)
Callable[[], None] subscribe(HomeAssistant hass, str topic, MessageCallbackType msg_callback, int qos=DEFAULT_QOS, str encoding="utf-8")
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
None publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
str|None get_file_path(str option, str|None default=None)
bool|None mqtt_config_entry_enabled(HomeAssistant hass)
HassJobType get_hassjob_callable_job_type(Callable[..., Any] target)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
None async_cleanup(HomeAssistant hass, DeviceRegistry dev_reg, entity_registry.EntityRegistry ent_reg)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
ModuleType async_import_module(HomeAssistant hass, str name)
CALLBACK_TYPE async_at_started(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)