Home Assistant Unofficial Reference 2024.12.1
subscription.py
Go to the documentation of this file.
1 """Helper to handle a set of topics to subscribe to."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from dataclasses import dataclass
7 from functools import partial
8 from typing import TYPE_CHECKING, Any
9 
10 from homeassistant.core import HassJobType, HomeAssistant, callback
11 
12 from . import debug_info
13 from .client import async_subscribe_internal
14 from .const import DEFAULT_QOS
15 from .models import MessageCallbackType
16 
17 
18 @dataclass(slots=True, kw_only=True)
20  """Class to hold data about an active entity topic subscription."""
21 
22  hass: HomeAssistant
23  topic: str | None
24  message_callback: MessageCallbackType
25  should_subscribe: bool | None
26  unsubscribe_callback: Callable[[], None] | None
27  qos: int = 0
28  encoding: str = "utf-8"
29  entity_id: str | None
30  job_type: HassJobType | None
31 
33  self, hass: HomeAssistant, other: EntitySubscription | None
34  ) -> None:
35  """Re-subscribe to the new topic if necessary."""
36  if not self._should_resubscribe_should_resubscribe(other):
37  if TYPE_CHECKING:
38  assert other
39  self.unsubscribe_callbackunsubscribe_callback = other.unsubscribe_callback
40  return
41 
42  if other is not None and other.unsubscribe_callback is not None:
43  other.unsubscribe_callback()
44  # Clear debug data if it exists
45  debug_info.remove_subscription(self.hass, str(other.topic), other.entity_id)
46 
47  if self.topic is None:
48  # We were asked to remove the subscription or not to create it
49  return
50 
51  # Prepare debug data
52  debug_info.add_subscription(self.hass, self.topic, self.entity_id)
53 
54  self.should_subscribeshould_subscribe = True
55 
56  @callback
57  def subscribe(self) -> None:
58  """Subscribe to a topic."""
59  if not self.should_subscribeshould_subscribe or not self.topic:
60  return
61  self.unsubscribe_callbackunsubscribe_callback = async_subscribe_internal(
62  self.hass,
63  self.topic,
64  self.message_callback,
65  self.qos,
66  self.encoding,
67  self.job_type,
68  )
69 
70  def _should_resubscribe(self, other: EntitySubscription | None) -> bool:
71  """Check if we should re-subscribe to the topic using the old state."""
72  if other is None:
73  return True
74 
75  return (
76  self.topic,
77  self.qos,
78  self.encoding,
79  ) != (
80  other.topic,
81  other.qos,
82  other.encoding,
83  )
84 
85 
86 @callback
88  hass: HomeAssistant,
89  sub_state: dict[str, EntitySubscription] | None,
90  topics: dict[str, dict[str, Any]],
91 ) -> dict[str, EntitySubscription]:
92  """Prepare (re)subscribe to a set of MQTT topics.
93 
94  State is kept in sub_state and a dictionary mapping from the subscription
95  key to the subscription state.
96 
97  After this function has been called, async_subscribe_topics must be called to
98  finalize any new subscriptions.
99 
100  Please note that the sub state must not be shared between multiple
101  sets of topics. Every call to async_subscribe_topics must always
102  contain _all_ the topics the subscription state should manage.
103  """
104  current_subscriptions: dict[str, EntitySubscription]
105  current_subscriptions = sub_state if sub_state is not None else {}
106  sub_state = {}
107  for key, value in topics.items():
108  # Extract the new requested subscription
109  requested = EntitySubscription(
110  topic=value.get("topic"),
111  message_callback=value["msg_callback"],
112  unsubscribe_callback=None,
113  qos=value.get("qos", DEFAULT_QOS),
114  encoding=value.get("encoding", "utf-8"),
115  hass=hass,
116  should_subscribe=None,
117  entity_id=value.get("entity_id"),
118  job_type=value.get("job_type"),
119  )
120  # Get the current subscription state
121  current = current_subscriptions.pop(key, None)
122  requested.resubscribe_if_necessary(hass, current)
123  sub_state[key] = requested
124 
125  # Go through all remaining subscriptions and unsubscribe them
126  for remaining in current_subscriptions.values():
127  if remaining.unsubscribe_callback is not None:
128  remaining.unsubscribe_callback()
129  # Clear debug data if it exists
130  debug_info.remove_subscription(
131  hass,
132  str(remaining.topic),
133  remaining.entity_id,
134  )
135 
136  return sub_state
137 
138 
140  hass: HomeAssistant,
141  sub_state: dict[str, EntitySubscription],
142 ) -> None:
143  """(Re)Subscribe to a set of MQTT topics."""
144  async_subscribe_topics_internal(hass, sub_state)
145 
146 
147 @callback
149  hass: HomeAssistant,
150  sub_state: dict[str, EntitySubscription],
151 ) -> None:
152  """(Re)Subscribe to a set of MQTT topics.
153 
154  This function is internal to the MQTT integration and should not be called
155  from outside the integration.
156  """
157  for sub in sub_state.values():
158  sub.subscribe()
159 
160 
161 if TYPE_CHECKING:
162 
164  hass: HomeAssistant, sub_state: dict[str, EntitySubscription] | None
165  ) -> dict[str, EntitySubscription]:
166  """Unsubscribe from all MQTT topics managed by async_subscribe_topics."""
167 
168 
169 async_unsubscribe_topics = partial(async_prepare_subscribe_topics, topics={})
bool _should_resubscribe(self, EntitySubscription|None other)
Definition: subscription.py:70
None resubscribe_if_necessary(self, HomeAssistant hass, EntitySubscription|None other)
Definition: subscription.py:34
CALLBACK_TYPE async_subscribe_internal(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING, HassJobType|None job_type=None)
Definition: client.py:210
None async_subscribe_topics_internal(HomeAssistant hass, dict[str, EntitySubscription] sub_state)
dict[str, EntitySubscription] async_prepare_subscribe_topics(HomeAssistant hass, dict[str, EntitySubscription]|None sub_state, dict[str, dict[str, Any]] topics)
Definition: subscription.py:91
None async_subscribe_topics(HomeAssistant hass, dict[str, EntitySubscription] sub_state)