1 """Utility functions for the MQTT integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from functools
import lru_cache
10 from pathlib
import Path
12 from typing
import Any
14 import voluptuous
as vol
37 from .models
import DATA_MQTT, DATA_MQTT_AVAILABLE, ReceiveMessage
39 AVAILABILITY_TIMEOUT = 50.0
41 TEMP_DIR_NAME = f
"home-assistant-{DOMAIN}"
43 _VALID_QOS_SCHEMA = vol.All(vol.Coerce(int), vol.In([0, 1, 2]))
45 _LOGGER = logging.getLogger(__name__)
49 """Ensure a cool down period before executing a job.
51 When a new execute request arrives we cancel the current request
54 We allow patching this util, as we generally have exceptions
55 for sleeps/waits/debouncers/timers causing long run times in tests.
59 self, timeout: float, callback_job: Callable[[], Coroutine[Any,
None,
None]]
61 """Initialize the timer."""
62 self.
_loop_loop = asyncio.get_running_loop()
65 self.
_task_task: asyncio.Task |
None =
None
66 self.
_timer_timer: asyncio.TimerHandle |
None =
None
70 """Set a new timeout period."""
74 """Execute after a cooldown period."""
77 except HomeAssistantError
as ha_error:
78 _LOGGER.error(
"%s", ha_error)
82 """Handle task done."""
87 """Execute the job."""
92 return self.
_task_task
97 return self.
_task_task
101 """Cancel any pending task."""
103 self.
_timer_timer.cancel()
108 """Ensure we execute after a cooldown period."""
116 if self.
_timer_timer.when() < next_when:
123 """Handle timer fire."""
135 """Cleanup any pending task."""
137 if not self.
_task_task:
139 self.
_task_task.cancel()
141 await self.
_task_task
142 except asyncio.CancelledError:
145 _LOGGER.exception(
"Error cleaning up task")
149 """Return the platforms to be set up."""
150 return {key
for platform
in config
for key
in platform}
155 config_entry: ConfigEntry,
156 platforms: set[Platform | str],
159 """Forward the config entry setup to the platforms and set up discovery."""
160 mqtt_data = hass.data[DATA_MQTT]
161 platforms_loaded = mqtt_data.platforms_loaded
162 new_platforms: set[Platform | str] = platforms - platforms_loaded
163 tasks: list[asyncio.Task] = []
164 if "device_automation" in new_platforms:
167 from .
import device_automation
170 create_eager_task(device_automation.async_setup_entry(hass, config_entry))
172 if "tag" in new_platforms:
177 tasks.append(create_eager_task(tag.async_setup_entry(hass, config_entry)))
178 if new_entity_platforms := (new_platforms - {
"tag",
"device_automation"}):
181 hass.config_entries.async_forward_entry_setups(
182 config_entry, new_entity_platforms
188 await asyncio.gather(*tasks)
189 platforms_loaded.update(new_platforms)
193 """Return true when the MQTT config entry is enabled."""
197 DATA_MQTT
in hass.data
and hass.data[DATA_MQTT].client.connected
198 )
or hass.config_entries.async_has_entries(
199 DOMAIN, include_disabled=
False, include_ignore=
False
204 """Wait for the MQTT client to become available.
206 Waits when mqtt set up is in progress,
207 It is not needed that the client is connected.
208 Returns True if the mqtt client is available.
209 Returns False when the client is not available.
214 entry = hass.config_entries.async_entries(DOMAIN)[0]
215 if entry.state == ConfigEntryState.LOADED:
218 state_reached_future: asyncio.Future[bool]
219 if DATA_MQTT_AVAILABLE
not in hass.data:
220 state_reached_future = hass.loop.create_future()
221 hass.data[DATA_MQTT_AVAILABLE] = state_reached_future
223 state_reached_future = hass.data[DATA_MQTT_AVAILABLE]
226 async
with asyncio.timeout(AVAILABILITY_TIMEOUT):
228 return await state_reached_future
234 """Validate that this is a valid topic name/filter.
236 This function is not cached and is not expected to be called
237 directly outside of this module. It is not marked as protected
238 only because its tested directly in test_util.py.
240 If it gets used outside of valid_subscribe_topic and
241 valid_publish_topic, it may need an lru_cache decorator or
242 an lru_cache decorator on the function where its used.
244 validated_topic = cv.string(topic)
246 raw_validated_topic = validated_topic.encode(
"utf-8")
247 except UnicodeError
as err:
248 raise vol.Invalid(
"MQTT topic name/filter must be valid UTF-8 string.")
from err
249 if not raw_validated_topic:
250 raise vol.Invalid(
"MQTT topic name/filter must not be empty.")
251 if len(raw_validated_topic) > 65535:
253 "MQTT topic name/filter must not be longer than 65535 encoded bytes."
256 for char
in validated_topic:
258 raise vol.Invalid(
"MQTT topic name/filter must not contain null character.")
259 if char <=
"\u001f" or "\u007f" <= char <=
"\u009f":
261 "MQTT topic name/filter must not contain control characters."
263 if "\ufdd0" <= char <=
"\ufdef" or (ord(char) & 0xFFFF)
in (0xFFFE, 0xFFFF):
264 raise vol.Invalid(
"MQTT topic name/filter must not contain non-characters.")
266 return validated_topic
271 """Validate that we can subscribe using this MQTT topic."""
273 if "+" in validated_topic:
274 for i
in (i
for i, c
in enumerate(validated_topic)
if c ==
"+"):
275 if (i > 0
and validated_topic[i - 1] !=
"/")
or (
276 i < len(validated_topic) - 1
and validated_topic[i + 1] !=
"/"
279 "Single-level wildcard must occupy an entire level of the filter"
282 index = validated_topic.find(
"#")
284 if index != len(validated_topic) - 1:
287 "Multi-level wildcard must be the last character in the topic filter."
289 if len(validated_topic) > 1
and validated_topic[index - 1] !=
"/":
291 "Multi-level wildcard must be after a topic level separator."
294 return validated_topic
298 """Validate either a jinja2 template or a valid MQTT subscription topic."""
299 tpl = cv.template(value)
309 """Validate that we can publish using this MQTT topic."""
311 if "+" in validated_topic
or "#" in validated_topic:
312 raise vol.Invalid(
"Wildcards cannot be used in topic names")
313 return validated_topic
317 """Validate that QOS value is valid."""
322 _MQTT_WILL_BIRTH_SCHEMA = vol.Schema(
324 vol.Required(ATTR_TOPIC): valid_publish_topic,
325 vol.Required(ATTR_PAYLOAD): cv.string,
326 vol.Optional(ATTR_QOS, default=DEFAULT_QOS): valid_qos_schema,
327 vol.Optional(ATTR_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
334 """Validate a birth or will configuration and required topic/payload."""
341 hass: HomeAssistant, config: ConfigType
343 """Create certificate temporary files for the MQTT client."""
346 if data
is None or data ==
"auto":
347 if temp_file.exists():
348 os.remove(Path(temp_file))
350 temp_file.write_text(data)
352 def _create_temp_dir_and_files() -> None:
353 """Create temporary directory."""
354 temp_dir = Path(tempfile.gettempdir()) / TEMP_DIR_NAME
357 config.get(CONF_CERTIFICATE)
358 or config.get(CONF_CLIENT_CERT)
359 or config.get(CONF_CLIENT_KEY)
360 )
and not temp_dir.exists():
361 temp_dir.mkdir(0o700)
367 await hass.async_add_executor_job(_create_temp_dir_and_files)
371 logger: logging.Logger, proposed_state: str, entity_id: str, msg: ReceiveMessage
373 """Check if the processed state is too long and log warning."""
374 if (state_length := len(proposed_state)) > MAX_LENGTH_STATE_STATE:
376 "Cannot update state for entity %s after processing "
377 "payload on topic %s. The requested state (%s) exceeds "
378 "the maximum allowed length (%s). Fall back to "
379 "%s, failed state: %s",
383 MAX_LENGTH_STATE_STATE,
385 proposed_state[:8192],
393 """Get file path of a certificate file."""
394 temp_dir = Path(tempfile.gettempdir()) / TEMP_DIR_NAME
395 if not temp_dir.exists():
398 file_path: Path = temp_dir / option
399 if not file_path.exists():
402 return str(temp_dir / option)
406 """Convert certificate file or setting to config entry setting."""
407 if file_name_or_auto ==
"auto":
410 with open(file_name_or_auto, encoding=DEFAULT_ENCODING)
as certificate_file:
411 return certificate_file.read()
None _async_timer_reached(self)
asyncio.Task async_execute(self)
None _async_task_done(self, asyncio.Task task)
None _async_cancel_timer(self)
None set_timeout(self, float timeout)
None __init__(self, float timeout, Callable[[], Coroutine[Any, None, None]] callback_job)
None async_schedule(self)
str _create_temp_file(str api_ip)
ConfigType valid_birth_will(ConfigType config)
None async_create_certificate_temp_files(HomeAssistant hass, ConfigType config)
int valid_qos_schema(Any qos)
bool check_state_too_long(logging.Logger logger, str proposed_state, str entity_id, ReceiveMessage msg)
str|None migrate_certificate_file_to_content(str file_name_or_auto)
bool async_wait_for_mqtt_client(HomeAssistant hass)
str|None get_file_path(str option, str|None default=None)
str valid_subscribe_topic(Any topic)
bool|None mqtt_config_entry_enabled(HomeAssistant hass)
str valid_topic(Any topic)
set[Platform|str] platforms_from_config(list[ConfigType] config)
template.Template valid_subscribe_topic_template(Any value)
str valid_publish_topic(Any topic)
None async_forward_entry_setup_and_setup_discovery(HomeAssistant hass, ConfigEntry config_entry, set[Platform|str] platforms, bool late=False)
None open(self, **Any kwargs)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)