1 """Helper to handle a set of topics to subscribe to."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from dataclasses
import dataclass
7 from functools
import partial
8 from typing
import TYPE_CHECKING, Any
12 from .
import debug_info
13 from .client
import async_subscribe_internal
14 from .const
import DEFAULT_QOS
15 from .models
import MessageCallbackType
18 @dataclass(slots=True, kw_only=True)
20 """Class to hold data about an active entity topic subscription."""
24 message_callback: MessageCallbackType
25 should_subscribe: bool |
None
26 unsubscribe_callback: Callable[[],
None] |
None
28 encoding: str =
"utf-8"
30 job_type: HassJobType |
None
33 self, hass: HomeAssistant, other: EntitySubscription |
None
35 """Re-subscribe to the new topic if necessary."""
42 if other
is not None and other.unsubscribe_callback
is not None:
43 other.unsubscribe_callback()
45 debug_info.remove_subscription(self.hass,
str(other.topic), other.entity_id)
47 if self.topic
is None:
52 debug_info.add_subscription(self.hass, self.topic, self.entity_id)
58 """Subscribe to a topic."""
64 self.message_callback,
71 """Check if we should re-subscribe to the topic using the old state."""
89 sub_state: dict[str, EntitySubscription] |
None,
90 topics: dict[str, dict[str, Any]],
91 ) -> dict[str, EntitySubscription]:
92 """Prepare (re)subscribe to a set of MQTT topics.
94 State is kept in sub_state and a dictionary mapping from the subscription
95 key to the subscription state.
97 After this function has been called, async_subscribe_topics must be called to
98 finalize any new subscriptions.
100 Please note that the sub state must not be shared between multiple
101 sets of topics. Every call to async_subscribe_topics must always
102 contain _all_ the topics the subscription state should manage.
104 current_subscriptions: dict[str, EntitySubscription]
105 current_subscriptions = sub_state
if sub_state
is not None else {}
107 for key, value
in topics.items():
110 topic=value.get(
"topic"),
111 message_callback=value[
"msg_callback"],
112 unsubscribe_callback=
None,
113 qos=value.get(
"qos", DEFAULT_QOS),
114 encoding=value.get(
"encoding",
"utf-8"),
116 should_subscribe=
None,
117 entity_id=value.get(
"entity_id"),
118 job_type=value.get(
"job_type"),
121 current = current_subscriptions.pop(key,
None)
122 requested.resubscribe_if_necessary(hass, current)
123 sub_state[key] = requested
126 for remaining
in current_subscriptions.values():
127 if remaining.unsubscribe_callback
is not None:
128 remaining.unsubscribe_callback()
130 debug_info.remove_subscription(
132 str(remaining.topic),
141 sub_state: dict[str, EntitySubscription],
143 """(Re)Subscribe to a set of MQTT topics."""
150 sub_state: dict[str, EntitySubscription],
152 """(Re)Subscribe to a set of MQTT topics.
154 This function is internal to the MQTT integration and should not be called
155 from outside the integration.
157 for sub
in sub_state.values():
164 hass: HomeAssistant, sub_state: dict[str, EntitySubscription] |
None
165 ) -> dict[str, EntitySubscription]:
166 """Unsubscribe from all MQTT topics managed by async_subscribe_topics."""
169 async_unsubscribe_topics = partial(async_prepare_subscribe_topics, topics={})
bool _should_resubscribe(self, EntitySubscription|None other)
None resubscribe_if_necessary(self, HomeAssistant hass, EntitySubscription|None other)
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
None async_subscribe_topics_internal(HomeAssistant hass, dict[str, EntitySubscription] sub_state)
dict[str, EntitySubscription] async_prepare_subscribe_topics(HomeAssistant hass, dict[str, EntitySubscription]|None sub_state, dict[str, dict[str, Any]] topics)
None async_subscribe_topics(HomeAssistant hass, dict[str, EntitySubscription] sub_state)