Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for MQTT."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import OrderedDict
7 from collections.abc import Callable, Mapping
8 import logging
9 import queue
10 from ssl import PROTOCOL_TLS_CLIENT, SSLContext, SSLError
11 from types import MappingProxyType
12 from typing import TYPE_CHECKING, Any
13 
14 from cryptography.hazmat.primitives.serialization import load_pem_private_key
15 from cryptography.x509 import load_pem_x509_certificate
16 import voluptuous as vol
17 
18 from homeassistant.components.file_upload import process_uploaded_file
19 from homeassistant.components.hassio import AddonError, AddonManager, AddonState
20 from homeassistant.config_entries import (
21  ConfigEntry,
22  ConfigFlow,
23  ConfigFlowResult,
24  OptionsFlow,
25 )
26 from homeassistant.const import (
27  CONF_CLIENT_ID,
28  CONF_DISCOVERY,
29  CONF_HOST,
30  CONF_PASSWORD,
31  CONF_PAYLOAD,
32  CONF_PORT,
33  CONF_PROTOCOL,
34  CONF_USERNAME,
35 )
36 from homeassistant.core import HomeAssistant, callback
37 from homeassistant.data_entry_flow import AbortFlow
38 from homeassistant.helpers import config_validation as cv
39 from homeassistant.helpers.hassio import is_hassio
40 from homeassistant.helpers.json import json_dumps
42  BooleanSelector,
43  FileSelector,
44  FileSelectorConfig,
45  NumberSelector,
46  NumberSelectorConfig,
47  NumberSelectorMode,
48  SelectOptionDict,
49  SelectSelector,
50  SelectSelectorConfig,
51  SelectSelectorMode,
52  TextSelector,
53  TextSelectorConfig,
54  TextSelectorType,
55 )
56 from homeassistant.helpers.service_info.hassio import HassioServiceInfo
57 from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
58 
59 from .addon import get_addon_manager
60 from .client import MqttClientSetup
61 from .const import (
62  ATTR_PAYLOAD,
63  ATTR_QOS,
64  ATTR_RETAIN,
65  ATTR_TOPIC,
66  CONF_BIRTH_MESSAGE,
67  CONF_BROKER,
68  CONF_CERTIFICATE,
69  CONF_CLIENT_CERT,
70  CONF_CLIENT_KEY,
71  CONF_DISCOVERY_PREFIX,
72  CONF_KEEPALIVE,
73  CONF_TLS_INSECURE,
74  CONF_TRANSPORT,
75  CONF_WILL_MESSAGE,
76  CONF_WS_HEADERS,
77  CONF_WS_PATH,
78  DEFAULT_BIRTH,
79  DEFAULT_DISCOVERY,
80  DEFAULT_ENCODING,
81  DEFAULT_KEEPALIVE,
82  DEFAULT_PORT,
83  DEFAULT_PREFIX,
84  DEFAULT_PROTOCOL,
85  DEFAULT_TRANSPORT,
86  DEFAULT_WILL,
87  DEFAULT_WS_PATH,
88  DOMAIN,
89  SUPPORTED_PROTOCOLS,
90  TRANSPORT_TCP,
91  TRANSPORT_WEBSOCKETS,
92 )
93 from .util import (
94  async_create_certificate_temp_files,
95  get_file_path,
96  valid_birth_will,
97  valid_publish_topic,
98 )
99 
100 _LOGGER = logging.getLogger(__name__)
101 
102 ADDON_SETUP_TIMEOUT = 5
103 ADDON_SETUP_TIMEOUT_ROUNDS = 5
104 
105 MQTT_TIMEOUT = 5
106 
107 ADVANCED_OPTIONS = "advanced_options"
108 SET_CA_CERT = "set_ca_cert"
109 SET_CLIENT_CERT = "set_client_cert"
110 
111 BOOLEAN_SELECTOR = BooleanSelector()
112 TEXT_SELECTOR = TextSelector(TextSelectorConfig(type=TextSelectorType.TEXT))
113 PUBLISH_TOPIC_SELECTOR = TextSelector(TextSelectorConfig(type=TextSelectorType.TEXT))
114 PORT_SELECTOR = vol.All(
115  NumberSelector(NumberSelectorConfig(mode=NumberSelectorMode.BOX, min=1, max=65535)),
116  vol.Coerce(int),
117 )
118 PASSWORD_SELECTOR = TextSelector(TextSelectorConfig(type=TextSelectorType.PASSWORD))
119 QOS_SELECTOR = vol.All(
120  NumberSelector(NumberSelectorConfig(mode=NumberSelectorMode.BOX, min=0, max=2)),
121  vol.Coerce(int),
122 )
123 KEEPALIVE_SELECTOR = vol.All(
126  mode=NumberSelectorMode.BOX, min=15, step="any", unit_of_measurement="sec"
127  )
128  ),
129  vol.Coerce(int),
130 )
131 PROTOCOL_SELECTOR = SelectSelector(
133  options=SUPPORTED_PROTOCOLS,
134  mode=SelectSelectorMode.DROPDOWN,
135  )
136 )
137 SUPPORTED_TRANSPORTS = [
138  SelectOptionDict(value=TRANSPORT_TCP, label="TCP"),
139  SelectOptionDict(value=TRANSPORT_WEBSOCKETS, label="WebSocket"),
140 ]
141 TRANSPORT_SELECTOR = SelectSelector(
143  options=SUPPORTED_TRANSPORTS,
144  mode=SelectSelectorMode.DROPDOWN,
145  )
146 )
147 WS_HEADERS_SELECTOR = TextSelector(
148  TextSelectorConfig(type=TextSelectorType.TEXT, multiline=True)
149 )
150 CA_VERIFICATION_MODES = [
151  "off",
152  "auto",
153  "custom",
154 ]
155 BROKER_VERIFICATION_SELECTOR = SelectSelector(
157  options=CA_VERIFICATION_MODES,
158  mode=SelectSelectorMode.DROPDOWN,
159  translation_key=SET_CA_CERT,
160  )
161 )
162 
163 # mime configuration from https://pki-tutorial.readthedocs.io/en/latest/mime.html
164 CA_CERT_UPLOAD_SELECTOR = FileSelector(
165  FileSelectorConfig(accept=".crt,application/x-x509-ca-cert")
166 )
167 CERT_UPLOAD_SELECTOR = FileSelector(
168  FileSelectorConfig(accept=".crt,application/x-x509-user-cert")
169 )
170 KEY_UPLOAD_SELECTOR = FileSelector(FileSelectorConfig(accept=".key,application/pkcs8"))
171 
172 REAUTH_SCHEMA = vol.Schema(
173  {
174  vol.Required(CONF_USERNAME): TEXT_SELECTOR,
175  vol.Required(CONF_PASSWORD): PASSWORD_SELECTOR,
176  }
177 )
178 PWD_NOT_CHANGED = "__**password_not_changed**__"
179 
180 
181 @callback
183  entry_password: str | None, user_input: dict[str, Any]
184 ) -> dict[str, Any]:
185  """Update the password if the entry has been updated.
186 
187  As we want to avoid reflecting the stored password in the UI,
188  we replace the suggested value in the UI with a sentitel,
189  and we change it back here if it was changed.
190  """
191  substituted_used_data = dict(user_input)
192  # Take out the password submitted
193  user_password: str | None = substituted_used_data.pop(CONF_PASSWORD, None)
194  # Only add the password if it has changed.
195  # If the sentinel password is submitted, we replace that with our current
196  # password from the config entry data.
197  password_changed = user_password is not None and user_password != PWD_NOT_CHANGED
198  password = user_password if password_changed else entry_password
199  if password is not None:
200  substituted_used_data[CONF_PASSWORD] = password
201  return substituted_used_data
202 
203 
204 class FlowHandler(ConfigFlow, domain=DOMAIN):
205  """Handle a config flow."""
206 
207  VERSION = 1
208 
209  _hassio_discovery: dict[str, Any] | None = None
210  _addon_manager: AddonManager
211 
212  def __init__(self) -> None:
213  """Set up flow instance."""
214  self.install_taskinstall_task: asyncio.Task | None = None
215  self.start_taskstart_task: asyncio.Task | None = None
216 
217  @staticmethod
218  @callback
220  config_entry: ConfigEntry,
221  ) -> MQTTOptionsFlowHandler:
222  """Get the options flow for this handler."""
223  return MQTTOptionsFlowHandler()
224 
225  async def _async_install_addon(self) -> None:
226  """Install the Mosquitto Mqtt broker add-on."""
227  addon_manager: AddonManager = get_addon_manager(self.hass)
228  await addon_manager.async_schedule_install_addon()
229 
231  self, user_input: dict[str, Any] | None = None
232  ) -> ConfigFlowResult:
233  """Add-on installation failed."""
234  return self.async_abortasync_abortasync_abort(
235  reason="addon_install_failed",
236  description_placeholders={"addon": self._addon_manager_addon_manager.addon_name},
237  )
238 
240  self, user_input: dict[str, Any] | None = None
241  ) -> ConfigFlowResult:
242  """Install Mosquitto Broker add-on."""
243  if self.install_taskinstall_task is None:
244  self.install_taskinstall_task = self.hass.async_create_task(self._async_install_addon_async_install_addon())
245 
246  if not self.install_taskinstall_task.done():
247  return self.async_show_progressasync_show_progress(
248  step_id="install_addon",
249  progress_action="install_addon",
250  progress_task=self.install_taskinstall_task,
251  )
252 
253  try:
254  await self.install_taskinstall_task
255  except AddonError as err:
256  _LOGGER.error(err)
257  return self.async_show_progress_doneasync_show_progress_done(next_step_id="install_failed")
258  finally:
259  self.install_taskinstall_task = None
260 
261  return self.async_show_progress_doneasync_show_progress_done(next_step_id="start_addon")
262 
264  self, user_input: dict[str, Any] | None = None
265  ) -> ConfigFlowResult:
266  """Add-on start failed."""
267  return self.async_abortasync_abortasync_abort(
268  reason="addon_start_failed",
269  description_placeholders={"addon": self._addon_manager_addon_manager.addon_name},
270  )
271 
273  self, user_input: dict[str, Any] | None = None
274  ) -> ConfigFlowResult:
275  """Start Mosquitto Broker add-on."""
276  if not self.start_taskstart_task:
277  self.start_taskstart_task = self.hass.async_create_task(self._async_start_addon_async_start_addon())
278  if not self.start_taskstart_task.done():
279  return self.async_show_progressasync_show_progress(
280  step_id="start_addon",
281  progress_action="start_addon",
282  progress_task=self.start_taskstart_task,
283  )
284  try:
285  await self.start_taskstart_task
286  except AddonError as err:
287  _LOGGER.error(err)
288  return self.async_show_progress_doneasync_show_progress_done(next_step_id="start_failed")
289  finally:
290  self.start_taskstart_task = None
291 
292  return self.async_show_progress_doneasync_show_progress_done(next_step_id="setup_entry_from_discovery")
293 
294  async def _async_get_config_and_try(self) -> dict[str, Any] | None:
295  """Get the MQTT add-on discovery info and try the connection."""
296  if self._hassio_discovery_hassio_discovery is not None:
297  return self._hassio_discovery_hassio_discovery
298  addon_manager: AddonManager = get_addon_manager(self.hass)
299  try:
300  addon_discovery_config = (
301  await addon_manager.async_get_addon_discovery_info()
302  )
303  config: dict[str, Any] = {
304  CONF_BROKER: addon_discovery_config[CONF_HOST],
305  CONF_PORT: addon_discovery_config[CONF_PORT],
306  CONF_USERNAME: addon_discovery_config.get(CONF_USERNAME),
307  CONF_PASSWORD: addon_discovery_config.get(CONF_PASSWORD),
308  CONF_DISCOVERY: DEFAULT_DISCOVERY,
309  }
310  except AddonError:
311  # We do not have discovery information yet
312  return None
313  if await self.hass.async_add_executor_job(
314  try_connection,
315  config,
316  ):
317  self._hassio_discovery_hassio_discovery = config
318  return config
319  return None
320 
321  async def _async_start_addon(self) -> None:
322  """Start the Mosquitto Broker add-on."""
323  addon_manager: AddonManager = get_addon_manager(self.hass)
324  await addon_manager.async_schedule_start_addon()
325 
326  # Sleep some seconds to let the add-on start properly before connecting.
327  for _ in range(ADDON_SETUP_TIMEOUT_ROUNDS):
328  await asyncio.sleep(ADDON_SETUP_TIMEOUT)
329  # Finish setup using discovery info to test the connection
330  if await self._async_get_config_and_try_async_get_config_and_try():
331  break
332  else:
333  raise AddonError(
334  f"Failed to correctly start {addon_manager.addon_name} add-on"
335  )
336 
337  async def async_step_user(
338  self, user_input: dict[str, Any] | None = None
339  ) -> ConfigFlowResult:
340  """Handle a flow initialized by the user."""
341  if is_hassio(self.hass):
342  # Offer to set up broker add-on if supervisor is available
343  self._addon_manager_addon_manager = get_addon_manager(self.hass)
344  return self.async_show_menuasync_show_menu(
345  step_id="user",
346  menu_options=["addon", "broker"],
347  description_placeholders={"addon": self._addon_manager_addon_manager.addon_name},
348  )
349 
350  # Start up a flow for manual setup
351  return await self.async_step_brokerasync_step_broker()
352 
354  self, user_input: dict[str, Any] | None = None
355  ) -> ConfigFlowResult:
356  """Set up mqtt entry from discovery info."""
357  if (config := await self._async_get_config_and_try_async_get_config_and_try()) is not None:
358  return self.async_create_entryasync_create_entryasync_create_entry(
359  title=self._addon_manager_addon_manager.addon_name,
360  data=config,
361  )
362 
363  raise AbortFlow(
364  "addon_connection_failed",
365  description_placeholders={"addon": self._addon_manager_addon_manager.addon_name},
366  )
367 
368  async def async_step_addon(
369  self, user_input: dict[str, Any] | None = None
370  ) -> ConfigFlowResult:
371  """Install and start MQTT broker add-on."""
372  addon_manager = self._addon_manager_addon_manager
373 
374  try:
375  addon_info = await addon_manager.async_get_addon_info()
376  except AddonError as err:
377  raise AbortFlow(
378  "addon_info_failed",
379  description_placeholders={"addon": self._addon_manager_addon_manager.addon_name},
380  ) from err
381 
382  if addon_info.state == AddonState.RUNNING:
383  # Finish setup using discovery info
384  return await self.async_step_setup_entry_from_discoveryasync_step_setup_entry_from_discovery()
385 
386  if addon_info.state == AddonState.NOT_RUNNING:
387  return await self.async_step_start_addonasync_step_start_addon()
388 
389  # Install the add-on and start it
390  return await self.async_step_install_addonasync_step_install_addon()
391 
392  async def async_step_reauth(
393  self, entry_data: Mapping[str, Any]
394  ) -> ConfigFlowResult:
395  """Handle re-authentication with MQTT broker."""
396  if is_hassio(self.hass):
397  # Check if entry setup matches the add-on discovery config
398  addon_manager = get_addon_manager(self.hass)
399  try:
400  addon_discovery_config = (
401  await addon_manager.async_get_addon_discovery_info()
402  )
403  except AddonError:
404  # Follow manual flow if we have an error
405  pass
406  else:
407  # Check if the addon secrets need to be renewed.
408  # This will repair the config entry,
409  # in case the official Mosquitto Broker addon was re-installed.
410  if (
411  entry_data[CONF_BROKER] == addon_discovery_config[CONF_HOST]
412  and entry_data[CONF_PORT] == addon_discovery_config[CONF_PORT]
413  and entry_data.get(CONF_USERNAME)
414  == (username := addon_discovery_config.get(CONF_USERNAME))
415  and entry_data.get(CONF_PASSWORD)
416  != (password := addon_discovery_config.get(CONF_PASSWORD))
417  ):
418  _LOGGER.info(
419  "Executing autorecovery %s add-on secrets",
420  addon_manager.addon_name,
421  )
422  return await self.async_step_reauth_confirmasync_step_reauth_confirm(
423  user_input={CONF_USERNAME: username, CONF_PASSWORD: password}
424  )
425 
426  return await self.async_step_reauth_confirmasync_step_reauth_confirm()
427 
429  self, user_input: dict[str, Any] | None = None
430  ) -> ConfigFlowResult:
431  """Confirm re-authentication with MQTT broker."""
432  errors: dict[str, str] = {}
433 
434  reauth_entry = self._get_reauth_entry_get_reauth_entry()
435  if user_input:
436  substituted_used_data = update_password_from_user_input(
437  reauth_entry.data.get(CONF_PASSWORD), user_input
438  )
439  new_entry_data = {**reauth_entry.data, **substituted_used_data}
440  if await self.hass.async_add_executor_job(
441  try_connection,
442  new_entry_data,
443  ):
444  return self.async_update_reload_and_abortasync_update_reload_and_abort(
445  reauth_entry, data=new_entry_data
446  )
447 
448  errors["base"] = "invalid_auth"
449 
450  schema = self.add_suggested_values_to_schemaadd_suggested_values_to_schema(
451  REAUTH_SCHEMA,
452  {
453  CONF_USERNAME: reauth_entry.data.get(CONF_USERNAME),
454  CONF_PASSWORD: PWD_NOT_CHANGED,
455  },
456  )
457  return self.async_show_formasync_show_formasync_show_form(
458  step_id="reauth_confirm",
459  data_schema=schema,
460  errors=errors,
461  )
462 
463  async def async_step_broker(
464  self, user_input: dict[str, Any] | None = None
465  ) -> ConfigFlowResult:
466  """Confirm the setup."""
467  errors: dict[str, str] = {}
468  fields: OrderedDict[Any, Any] = OrderedDict()
469  validated_user_input: dict[str, Any] = {}
470  if await async_get_broker_settings(
471  self,
472  fields,
473  None,
474  user_input,
475  validated_user_input,
476  errors,
477  ):
478  can_connect = await self.hass.async_add_executor_job(
479  try_connection,
480  validated_user_input,
481  )
482 
483  if can_connect:
484  validated_user_input[CONF_DISCOVERY] = DEFAULT_DISCOVERY
485  return self.async_create_entryasync_create_entryasync_create_entry(
486  title=validated_user_input[CONF_BROKER],
487  data=validated_user_input,
488  )
489 
490  errors["base"] = "cannot_connect"
491 
492  return self.async_show_formasync_show_formasync_show_form(
493  step_id="broker", data_schema=vol.Schema(fields), errors=errors
494  )
495 
496  async def async_step_hassio(
497  self, discovery_info: HassioServiceInfo
498  ) -> ConfigFlowResult:
499  """Receive a Hass.io discovery or process setup after addon install."""
500  await self._async_handle_discovery_without_unique_id_async_handle_discovery_without_unique_id()
501 
502  self._hassio_discovery_hassio_discovery = discovery_info.config
503 
504  return await self.async_step_hassio_confirmasync_step_hassio_confirm()
505 
507  self, user_input: dict[str, Any] | None = None
508  ) -> ConfigFlowResult:
509  """Confirm a Hass.io discovery."""
510  errors: dict[str, str] = {}
511  if TYPE_CHECKING:
512  assert self._hassio_discovery_hassio_discovery
513 
514  if user_input is not None:
515  data: dict[str, Any] = self._hassio_discovery_hassio_discovery.copy()
516  data[CONF_BROKER] = data.pop(CONF_HOST)
517  can_connect = await self.hass.async_add_executor_job(
518  try_connection,
519  data,
520  )
521 
522  if can_connect:
523  return self.async_create_entryasync_create_entryasync_create_entry(
524  title=data["addon"],
525  data={
526  CONF_BROKER: data[CONF_BROKER],
527  CONF_PORT: data[CONF_PORT],
528  CONF_USERNAME: data.get(CONF_USERNAME),
529  CONF_PASSWORD: data.get(CONF_PASSWORD),
530  CONF_DISCOVERY: DEFAULT_DISCOVERY,
531  },
532  )
533 
534  errors["base"] = "cannot_connect"
535 
536  return self.async_show_formasync_show_formasync_show_form(
537  step_id="hassio_confirm",
538  description_placeholders={"addon": self._hassio_discovery_hassio_discovery["addon"]},
539  errors=errors,
540  )
541 
542 
544  """Handle MQTT options."""
545 
546  def __init__(self) -> None:
547  """Initialize MQTT options flow."""
548  self.broker_config: dict[str, str | int] = {}
549 
550  async def async_step_init(self, user_input: None = None) -> ConfigFlowResult:
551  """Manage the MQTT options."""
552  return await self.async_step_brokerasync_step_broker()
553 
554  async def async_step_broker(
555  self, user_input: dict[str, Any] | None = None
556  ) -> ConfigFlowResult:
557  """Manage the MQTT broker configuration."""
558  errors: dict[str, str] = {}
559  fields: OrderedDict[Any, Any] = OrderedDict()
560  validated_user_input: dict[str, Any] = {}
561  if await async_get_broker_settings(
562  self,
563  fields,
564  self.config_entryconfig_entryconfig_entry.data,
565  user_input,
566  validated_user_input,
567  errors,
568  ):
569  self.broker_config.update(
571  self.config_entryconfig_entryconfig_entry.data.get(CONF_PASSWORD), validated_user_input
572  ),
573  )
574  can_connect = await self.hass.async_add_executor_job(
575  try_connection,
576  self.broker_config,
577  )
578 
579  if can_connect:
580  return await self.async_step_optionsasync_step_options()
581 
582  errors["base"] = "cannot_connect"
583 
584  return self.async_show_formasync_show_form(
585  step_id="broker",
586  data_schema=vol.Schema(fields),
587  errors=errors,
588  last_step=False,
589  )
590 
592  self, user_input: dict[str, Any] | None = None
593  ) -> ConfigFlowResult:
594  """Manage the MQTT options."""
595  errors = {}
596  current_config = self.config_entryconfig_entryconfig_entry.data
597  options_config: dict[str, Any] = {}
598  bad_input: bool = False
599 
600  def _birth_will(birt_or_will: str) -> dict[str, Any]:
601  """Return the user input for birth or will."""
602  if TYPE_CHECKING:
603  assert user_input
604  return {
605  ATTR_TOPIC: user_input[f"{birt_or_will}_topic"],
606  ATTR_PAYLOAD: user_input.get(f"{birt_or_will}_payload", ""),
607  ATTR_QOS: user_input[f"{birt_or_will}_qos"],
608  ATTR_RETAIN: user_input[f"{birt_or_will}_retain"],
609  }
610 
611  def _validate(
612  field: str,
613  values: dict[str, Any],
614  error_code: str,
615  schema: Callable[[Any], Any],
616  ) -> None:
617  """Validate the user input."""
618  nonlocal bad_input
619  try:
620  option_values = schema(values)
621  options_config[field] = option_values
622  except vol.Invalid:
623  errors["base"] = error_code
624  bad_input = True
625 
626  if user_input is not None:
627  # validate input
628  options_config[CONF_DISCOVERY] = user_input[CONF_DISCOVERY]
629  _validate(
630  CONF_DISCOVERY_PREFIX,
631  user_input[CONF_DISCOVERY_PREFIX],
632  "bad_discovery_prefix",
633  valid_publish_topic,
634  )
635  if "birth_topic" in user_input:
636  _validate(
637  CONF_BIRTH_MESSAGE,
638  _birth_will("birth"),
639  "bad_birth",
640  valid_birth_will,
641  )
642  if not user_input["birth_enable"]:
643  options_config[CONF_BIRTH_MESSAGE] = {}
644 
645  if "will_topic" in user_input:
646  _validate(
647  CONF_WILL_MESSAGE,
648  _birth_will("will"),
649  "bad_will",
650  valid_birth_will,
651  )
652  if not user_input["will_enable"]:
653  options_config[CONF_WILL_MESSAGE] = {}
654 
655  if not bad_input:
656  updated_config = {}
657  updated_config.update(self.broker_config)
658  updated_config.update(options_config)
659  self.hass.config_entries.async_update_entry(
660  self.config_entryconfig_entryconfig_entry,
661  data=updated_config,
662  title=str(self.broker_config[CONF_BROKER]),
663  )
664  return self.async_create_entryasync_create_entry(title="", data={})
665 
666  birth = {
667  **DEFAULT_BIRTH,
668  **current_config.get(CONF_BIRTH_MESSAGE, {}),
669  }
670  will = {
671  **DEFAULT_WILL,
672  **current_config.get(CONF_WILL_MESSAGE, {}),
673  }
674  discovery = current_config.get(CONF_DISCOVERY, DEFAULT_DISCOVERY)
675  discovery_prefix = current_config.get(CONF_DISCOVERY_PREFIX, DEFAULT_PREFIX)
676 
677  # build form
678  fields: OrderedDict[vol.Marker, Any] = OrderedDict()
679  fields[vol.Optional(CONF_DISCOVERY, default=discovery)] = BOOLEAN_SELECTOR
680  fields[vol.Optional(CONF_DISCOVERY_PREFIX, default=discovery_prefix)] = (
681  PUBLISH_TOPIC_SELECTOR
682  )
683 
684  # Birth message is disabled if CONF_BIRTH_MESSAGE = {}
685  fields[
686  vol.Optional(
687  "birth_enable",
688  default=CONF_BIRTH_MESSAGE not in current_config
689  or current_config[CONF_BIRTH_MESSAGE] != {},
690  )
691  ] = BOOLEAN_SELECTOR
692  fields[
693  vol.Optional(
694  "birth_topic", description={"suggested_value": birth[ATTR_TOPIC]}
695  )
696  ] = PUBLISH_TOPIC_SELECTOR
697  fields[
698  vol.Optional(
699  "birth_payload", description={"suggested_value": birth[CONF_PAYLOAD]}
700  )
701  ] = TEXT_SELECTOR
702  fields[vol.Optional("birth_qos", default=birth[ATTR_QOS])] = QOS_SELECTOR
703  fields[vol.Optional("birth_retain", default=birth[ATTR_RETAIN])] = (
704  BOOLEAN_SELECTOR
705  )
706 
707  # Will message is disabled if CONF_WILL_MESSAGE = {}
708  fields[
709  vol.Optional(
710  "will_enable",
711  default=CONF_WILL_MESSAGE not in current_config
712  or current_config[CONF_WILL_MESSAGE] != {},
713  )
714  ] = BOOLEAN_SELECTOR
715  fields[
716  vol.Optional(
717  "will_topic", description={"suggested_value": will[ATTR_TOPIC]}
718  )
719  ] = PUBLISH_TOPIC_SELECTOR
720  fields[
721  vol.Optional(
722  "will_payload", description={"suggested_value": will[CONF_PAYLOAD]}
723  )
724  ] = TEXT_SELECTOR
725  fields[vol.Optional("will_qos", default=will[ATTR_QOS])] = QOS_SELECTOR
726  fields[vol.Optional("will_retain", default=will[ATTR_RETAIN])] = (
727  BOOLEAN_SELECTOR
728  )
729 
730  return self.async_show_formasync_show_form(
731  step_id="options",
732  data_schema=vol.Schema(fields),
733  errors=errors,
734  last_step=True,
735  )
736 
737 
738 async def _get_uploaded_file(hass: HomeAssistant, id: str) -> str:
739  """Get file content from uploaded file."""
740 
741  def _proces_uploaded_file() -> str:
742  with process_uploaded_file(hass, id) as file_path:
743  return file_path.read_text(encoding=DEFAULT_ENCODING)
744 
745  return await hass.async_add_executor_job(_proces_uploaded_file)
746 
747 
749  flow: ConfigFlow | OptionsFlow,
750  fields: OrderedDict[Any, Any],
751  entry_config: MappingProxyType[str, Any] | None,
752  user_input: dict[str, Any] | None,
753  validated_user_input: dict[str, Any],
754  errors: dict[str, str],
755 ) -> bool:
756  """Build the config flow schema to collect the broker settings.
757 
758  Shows advanced options if one or more are configured
759  or when the advanced_broker_options checkbox was selected.
760  Returns True when settings are collected successfully.
761  """
762  hass = flow.hass
763  advanced_broker_options: bool = False
764  user_input_basic: dict[str, Any] = {}
765  current_config: dict[str, Any] = (
766  entry_config.copy() if entry_config is not None else {}
767  )
768 
769  async def _async_validate_broker_settings(
770  config: dict[str, Any],
771  user_input: dict[str, Any],
772  validated_user_input: dict[str, Any],
773  errors: dict[str, str],
774  ) -> bool:
775  """Additional validation on broker settings for better error messages."""
776 
777  # Get current certificate settings from config entry
778  certificate: str | None = (
779  "auto"
780  if user_input.get(SET_CA_CERT, "off") == "auto"
781  else config.get(CONF_CERTIFICATE)
782  if user_input.get(SET_CA_CERT, "off") == "custom"
783  else None
784  )
785  client_certificate: str | None = (
786  config.get(CONF_CLIENT_CERT) if user_input.get(SET_CLIENT_CERT) else None
787  )
788  client_key: str | None = (
789  config.get(CONF_CLIENT_KEY) if user_input.get(SET_CLIENT_CERT) else None
790  )
791 
792  # Prepare entry update with uploaded files
793  validated_user_input.update(user_input)
794  client_certificate_id: str | None = user_input.get(CONF_CLIENT_CERT)
795  client_key_id: str | None = user_input.get(CONF_CLIENT_KEY)
796  if (
797  client_certificate_id
798  and not client_key_id
799  or not client_certificate_id
800  and client_key_id
801  ):
802  errors["base"] = "invalid_inclusion"
803  return False
804  certificate_id: str | None = user_input.get(CONF_CERTIFICATE)
805  if certificate_id:
806  certificate = await _get_uploaded_file(hass, certificate_id)
807 
808  # Return to form for file upload CA cert or client cert and key
809  if (
810  not client_certificate
811  and user_input.get(SET_CLIENT_CERT)
812  and not client_certificate_id
813  or not certificate
814  and user_input.get(SET_CA_CERT, "off") == "custom"
815  and not certificate_id
816  or user_input.get(CONF_TRANSPORT) == TRANSPORT_WEBSOCKETS
817  and CONF_WS_PATH not in user_input
818  ):
819  return False
820 
821  if client_certificate_id:
822  client_certificate = await _get_uploaded_file(hass, client_certificate_id)
823  if client_key_id:
824  client_key = await _get_uploaded_file(hass, client_key_id)
825 
826  certificate_data: dict[str, Any] = {}
827  if certificate:
828  certificate_data[CONF_CERTIFICATE] = certificate
829  if client_certificate:
830  certificate_data[CONF_CLIENT_CERT] = client_certificate
831  certificate_data[CONF_CLIENT_KEY] = client_key
832 
833  validated_user_input.update(certificate_data)
834  await async_create_certificate_temp_files(hass, certificate_data)
835  if error := await hass.async_add_executor_job(
836  check_certicate_chain,
837  ):
838  errors["base"] = error
839  return False
840 
841  if SET_CA_CERT in validated_user_input:
842  del validated_user_input[SET_CA_CERT]
843  if SET_CLIENT_CERT in validated_user_input:
844  del validated_user_input[SET_CLIENT_CERT]
845  if validated_user_input.get(CONF_TRANSPORT, TRANSPORT_TCP) == TRANSPORT_TCP:
846  if CONF_WS_PATH in validated_user_input:
847  del validated_user_input[CONF_WS_PATH]
848  if CONF_WS_HEADERS in validated_user_input:
849  del validated_user_input[CONF_WS_HEADERS]
850  return True
851  try:
852  validated_user_input[CONF_WS_HEADERS] = json_loads(
853  validated_user_input.get(CONF_WS_HEADERS, "{}")
854  )
855  schema = vol.Schema({cv.string: cv.template})
856  schema(validated_user_input[CONF_WS_HEADERS])
857  except (*JSON_DECODE_EXCEPTIONS, vol.MultipleInvalid):
858  errors["base"] = "bad_ws_headers"
859  return False
860  return True
861 
862  if user_input:
863  user_input_basic = user_input.copy()
864  advanced_broker_options = user_input_basic.get(ADVANCED_OPTIONS, False)
865  if ADVANCED_OPTIONS not in user_input or advanced_broker_options is False:
866  if await _async_validate_broker_settings(
867  current_config,
868  user_input_basic,
869  validated_user_input,
870  errors,
871  ):
872  return True
873  # Get defaults settings from previous post
874  current_broker = user_input_basic.get(CONF_BROKER)
875  current_port = user_input_basic.get(CONF_PORT, DEFAULT_PORT)
876  current_user = user_input_basic.get(CONF_USERNAME)
877  current_pass = user_input_basic.get(CONF_PASSWORD)
878  else:
879  # Get default settings from entry (if any)
880  current_broker = current_config.get(CONF_BROKER)
881  current_port = current_config.get(CONF_PORT, DEFAULT_PORT)
882  current_user = current_config.get(CONF_USERNAME)
883  # Return the sentinel password to avoid exposure
884  current_entry_pass = current_config.get(CONF_PASSWORD)
885  current_pass = PWD_NOT_CHANGED if current_entry_pass else None
886 
887  # Treat the previous post as an update of the current settings
888  # (if there was a basic broker setup step)
889  current_config.update(user_input_basic)
890 
891  # Get default settings for advanced broker options
892  current_client_id = current_config.get(CONF_CLIENT_ID)
893  current_keepalive = current_config.get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE)
894  current_ca_certificate = current_config.get(CONF_CERTIFICATE)
895  current_client_certificate = current_config.get(CONF_CLIENT_CERT)
896  current_client_key = current_config.get(CONF_CLIENT_KEY)
897  current_tls_insecure = current_config.get(CONF_TLS_INSECURE, False)
898  current_protocol = current_config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)
899  current_transport = current_config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
900  current_ws_path = current_config.get(CONF_WS_PATH, DEFAULT_WS_PATH)
901  current_ws_headers = (
902  json_dumps(current_config.get(CONF_WS_HEADERS))
903  if CONF_WS_HEADERS in current_config
904  else None
905  )
906  advanced_broker_options |= bool(
907  current_client_id
908  or current_keepalive != DEFAULT_KEEPALIVE
909  or current_ca_certificate
910  or current_client_certificate
911  or current_client_key
912  or current_tls_insecure
913  or current_protocol != DEFAULT_PROTOCOL
914  or current_config.get(SET_CA_CERT, "off") != "off"
915  or current_config.get(SET_CLIENT_CERT)
916  or current_transport == TRANSPORT_WEBSOCKETS
917  )
918 
919  # Build form
920  fields[vol.Required(CONF_BROKER, default=current_broker)] = TEXT_SELECTOR
921  fields[vol.Required(CONF_PORT, default=current_port)] = PORT_SELECTOR
922  fields[
923  vol.Optional(
924  CONF_USERNAME,
925  description={"suggested_value": current_user},
926  )
927  ] = TEXT_SELECTOR
928  fields[
929  vol.Optional(
930  CONF_PASSWORD,
931  description={"suggested_value": current_pass},
932  )
933  ] = PASSWORD_SELECTOR
934  # show advanced options checkbox if requested and
935  # advanced options are enabled
936  # or when the defaults of advanced options are overridden
937  if not advanced_broker_options:
938  if not flow.show_advanced_options:
939  return False
940  fields[
941  vol.Optional(
942  ADVANCED_OPTIONS,
943  )
944  ] = BOOLEAN_SELECTOR
945  return False
946  fields[
947  vol.Optional(
948  CONF_CLIENT_ID,
949  description={"suggested_value": current_client_id},
950  )
951  ] = TEXT_SELECTOR
952  fields[
953  vol.Optional(
954  CONF_KEEPALIVE,
955  description={"suggested_value": current_keepalive},
956  )
957  ] = KEEPALIVE_SELECTOR
958  fields[
959  vol.Optional(
960  SET_CLIENT_CERT,
961  default=current_client_certificate is not None
962  or current_config.get(SET_CLIENT_CERT) is True,
963  )
964  ] = BOOLEAN_SELECTOR
965  if (
966  current_client_certificate is not None
967  or current_config.get(SET_CLIENT_CERT) is True
968  ):
969  fields[
970  vol.Optional(
971  CONF_CLIENT_CERT,
972  description={"suggested_value": user_input_basic.get(CONF_CLIENT_CERT)},
973  )
974  ] = CERT_UPLOAD_SELECTOR
975  fields[
976  vol.Optional(
977  CONF_CLIENT_KEY,
978  description={"suggested_value": user_input_basic.get(CONF_CLIENT_KEY)},
979  )
980  ] = KEY_UPLOAD_SELECTOR
981  verification_mode = current_config.get(SET_CA_CERT) or (
982  "off"
983  if current_ca_certificate is None
984  else "auto"
985  if current_ca_certificate == "auto"
986  else "custom"
987  )
988  fields[
989  vol.Optional(
990  SET_CA_CERT,
991  default=verification_mode,
992  )
993  ] = BROKER_VERIFICATION_SELECTOR
994  if current_ca_certificate is not None or verification_mode == "custom":
995  fields[
996  vol.Optional(
997  CONF_CERTIFICATE,
998  user_input_basic.get(CONF_CERTIFICATE),
999  )
1000  ] = CA_CERT_UPLOAD_SELECTOR
1001  fields[
1002  vol.Optional(
1003  CONF_TLS_INSECURE,
1004  description={"suggested_value": current_tls_insecure},
1005  )
1006  ] = BOOLEAN_SELECTOR
1007  fields[
1008  vol.Optional(
1009  CONF_PROTOCOL,
1010  description={"suggested_value": current_protocol},
1011  )
1012  ] = PROTOCOL_SELECTOR
1013  fields[
1014  vol.Optional(
1015  CONF_TRANSPORT,
1016  description={"suggested_value": current_transport},
1017  )
1018  ] = TRANSPORT_SELECTOR
1019  if current_transport == TRANSPORT_WEBSOCKETS:
1020  fields[
1021  vol.Optional(CONF_WS_PATH, description={"suggested_value": current_ws_path})
1022  ] = TEXT_SELECTOR
1023  fields[
1024  vol.Optional(
1025  CONF_WS_HEADERS, description={"suggested_value": current_ws_headers}
1026  )
1027  ] = WS_HEADERS_SELECTOR
1028 
1029  # Show form
1030  return False
1031 
1032 
1034  user_input: dict[str, Any],
1035 ) -> bool:
1036  """Test if we can connect to an MQTT broker."""
1037  # We don't import on the top because some integrations
1038  # should be able to optionally rely on MQTT.
1039  import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel
1040 
1041  mqtt_client_setup = MqttClientSetup(user_input)
1042  mqtt_client_setup.setup()
1043  client = mqtt_client_setup.client
1044 
1045  result: queue.Queue[bool] = queue.Queue(maxsize=1)
1046 
1047  def on_connect(
1048  client_: mqtt.Client,
1049  userdata: None,
1050  flags: dict[str, Any],
1051  result_code: int,
1052  properties: mqtt.Properties | None = None,
1053  ) -> None:
1054  """Handle connection result."""
1055  result.put(result_code == mqtt.CONNACK_ACCEPTED)
1056 
1057  client.on_connect = on_connect
1058 
1059  client.connect_async(user_input[CONF_BROKER], user_input[CONF_PORT])
1060  client.loop_start()
1061 
1062  try:
1063  return result.get(timeout=MQTT_TIMEOUT)
1064  except queue.Empty:
1065  return False
1066  finally:
1067  client.disconnect()
1068  client.loop_stop()
1069 
1070 
1071 def check_certicate_chain() -> str | None:
1072  """Check the MQTT certificates."""
1073  if client_certificate := get_file_path(CONF_CLIENT_CERT):
1074  try:
1075  with open(client_certificate, "rb") as client_certificate_file:
1076  load_pem_x509_certificate(client_certificate_file.read())
1077  except ValueError:
1078  return "bad_client_cert"
1079  # Check we can serialize the private key file
1080  if private_key := get_file_path(CONF_CLIENT_KEY):
1081  try:
1082  with open(private_key, "rb") as client_key_file:
1083  load_pem_private_key(client_key_file.read(), password=None)
1084  except (TypeError, ValueError):
1085  return "bad_client_key"
1086  # Check the certificate chain
1087  context = SSLContext(PROTOCOL_TLS_CLIENT)
1088  if client_certificate and private_key:
1089  try:
1090  context.load_cert_chain(client_certificate, private_key)
1091  except SSLError:
1092  return "bad_client_cert_key"
1093  # try to load the custom CA file
1094  if (ca_cert := get_file_path(CONF_CERTIFICATE)) is None:
1095  return None
1096 
1097  try:
1098  context.load_verify_locations(ca_cert)
1099  except SSLError:
1100  return "bad_certificate"
1101  return None
ConfigFlowResult async_step_hassio(self, HassioServiceInfo discovery_info)
Definition: config_flow.py:498
ConfigFlowResult async_step_start_addon(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:274
ConfigFlowResult async_step_broker(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:465
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:394
ConfigFlowResult async_step_addon(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:370
MQTTOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:221
ConfigFlowResult async_step_setup_entry_from_discovery(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:355
ConfigFlowResult async_step_start_failed(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:265
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:339
ConfigFlowResult async_step_install_failed(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:232
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:430
ConfigFlowResult async_step_install_addon(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:241
ConfigFlowResult async_step_hassio_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:508
ConfigFlowResult async_step_broker(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:556
ConfigFlowResult async_step_init(self, None user_input=None)
Definition: config_flow.py:550
ConfigFlowResult async_step_options(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:593
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
str
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_progress(self, *str|None step_id=None, str progress_action, Mapping[str, str]|None description_placeholders=None, asyncio.Task[Any]|None progress_task=None)
_FlowResultT async_show_menu(self, *str|None step_id=None, Container[str] menu_options, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_show_progress_done(self, *str next_step_id)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
Iterator[Path] process_uploaded_file(HomeAssistant hass, str file_id)
Definition: __init__.py:36
bool is_hassio(HomeAssistant hass)
Definition: __init__.py:302
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
str _get_uploaded_file(HomeAssistant hass, str id)
Definition: config_flow.py:738
dict[str, Any] update_password_from_user_input(str|None entry_password, dict[str, Any] user_input)
Definition: config_flow.py:184
bool async_get_broker_settings(ConfigFlow|OptionsFlow flow, OrderedDict[Any, Any] fields, MappingProxyType[str, Any]|None entry_config, dict[str, Any]|None user_input, dict[str, Any] validated_user_input, dict[str, str] errors)
Definition: config_flow.py:755
bool try_connection(dict[str, Any] user_input)
None async_create_certificate_temp_files(HomeAssistant hass, ConfigType config)
Definition: util.py:342
str|None get_file_path(str option, str|None default=None)
Definition: util.py:392
None open(self, **Any kwargs)
Definition: lock.py:86
AddonManager get_addon_manager(HomeAssistant hass, str slug)
Definition: config_flow.py:44
str json_dumps(Any data)
Definition: json.py:149