Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Ecovacs mqtt integration."""
2 
3 from __future__ import annotations
4 
5 from functools import partial
6 import logging
7 import ssl
8 from typing import Any
9 from urllib.parse import urlparse
10 
11 from aiohttp import ClientError
12 from deebot_client.authentication import Authenticator, create_rest_config
13 from deebot_client.const import UNDEFINED, UndefinedType
14 from deebot_client.exceptions import InvalidAuthenticationError, MqttError
15 from deebot_client.mqtt_client import MqttClient, create_mqtt_config
16 from deebot_client.util import md5
17 import voluptuous as vol
18 
19 from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
20 from homeassistant.const import CONF_COUNTRY, CONF_MODE, CONF_PASSWORD, CONF_USERNAME
21 from homeassistant.core import HomeAssistant
22 from homeassistant.helpers import aiohttp_client, selector
23 from homeassistant.helpers.typing import VolDictType
24 from homeassistant.util.ssl import get_default_no_verify_context
25 
26 from .const import (
27  CONF_OVERRIDE_MQTT_URL,
28  CONF_OVERRIDE_REST_URL,
29  CONF_VERIFY_MQTT_CERTIFICATE,
30  DOMAIN,
31  InstanceMode,
32 )
33 from .util import get_client_device_id
34 
35 _LOGGER = logging.getLogger(__name__)
36 
37 
39  value: str,
40  field_name: str,
41  schema_list: set[str],
42 ) -> dict[str, str]:
43  """Validate an URL and return error dictionary."""
44  if urlparse(value).scheme not in schema_list:
45  return {field_name: f"invalid_url_schema_{field_name}"}
46  try:
47  vol.Schema(vol.Url())(value)
48  except vol.Invalid:
49  return {field_name: "invalid_url"}
50  return {}
51 
52 
53 async def _validate_input(
54  hass: HomeAssistant, user_input: dict[str, Any]
55 ) -> dict[str, str]:
56  """Validate user input."""
57  errors: dict[str, str] = {}
58 
59  if rest_url := user_input.get(CONF_OVERRIDE_REST_URL):
60  errors.update(
61  _validate_url(rest_url, CONF_OVERRIDE_REST_URL, {"http", "https"})
62  )
63  if mqtt_url := user_input.get(CONF_OVERRIDE_MQTT_URL):
64  errors.update(
65  _validate_url(mqtt_url, CONF_OVERRIDE_MQTT_URL, {"mqtt", "mqtts"})
66  )
67 
68  if errors:
69  return errors
70 
71  device_id = get_client_device_id(hass, rest_url is not None)
72  country = user_input[CONF_COUNTRY]
73  rest_config = create_rest_config(
74  aiohttp_client.async_get_clientsession(hass),
75  device_id=device_id,
76  alpha_2_country=country,
77  override_rest_url=rest_url,
78  )
79 
80  authenticator = Authenticator(
81  rest_config,
82  user_input[CONF_USERNAME],
83  md5(user_input[CONF_PASSWORD]),
84  )
85 
86  try:
87  await authenticator.authenticate()
88  except ClientError:
89  _LOGGER.debug("Cannot connect", exc_info=True)
90  errors["base"] = "cannot_connect"
91  except InvalidAuthenticationError:
92  errors["base"] = "invalid_auth"
93  except Exception:
94  _LOGGER.exception("Unexpected exception during login")
95  errors["base"] = "unknown"
96 
97  if errors:
98  return errors
99 
100  ssl_context: UndefinedType | ssl.SSLContext = UNDEFINED
101  if not user_input.get(CONF_VERIFY_MQTT_CERTIFICATE, True) and mqtt_url:
102  ssl_context = get_default_no_verify_context()
103 
104  mqtt_config = await hass.async_add_executor_job(
105  partial(
106  create_mqtt_config,
107  device_id=device_id,
108  country=country,
109  override_mqtt_url=mqtt_url,
110  ssl_context=ssl_context,
111  )
112  )
113 
114  client = MqttClient(mqtt_config, authenticator)
115  cannot_connect_field = CONF_OVERRIDE_MQTT_URL if mqtt_url else "base"
116 
117  try:
118  await client.verify_config()
119  except MqttError:
120  _LOGGER.debug("Cannot connect", exc_info=True)
121  errors[cannot_connect_field] = "cannot_connect"
122  except InvalidAuthenticationError:
123  errors["base"] = "invalid_auth"
124  except Exception:
125  _LOGGER.exception("Unexpected exception during mqtt connection verification")
126  errors["base"] = "unknown"
127 
128  return errors
129 
130 
131 class EcovacsConfigFlow(ConfigFlow, domain=DOMAIN):
132  """Handle a config flow for Ecovacs."""
133 
134  VERSION = 1
135 
136  _mode: InstanceMode = InstanceMode.CLOUD
137 
138  async def async_step_user(
139  self, user_input: dict[str, Any] | None = None
140  ) -> ConfigFlowResult:
141  """Handle the initial step."""
142 
143  if not self.show_advanced_optionsshow_advanced_options:
144  return await self.async_step_auth()
145 
146  if user_input:
147  self._mode_mode = user_input[CONF_MODE]
148  return await self.async_step_auth()
149 
150  return self.async_show_formasync_show_formasync_show_form(
151  step_id="user",
152  data_schema=vol.Schema(
153  {
154  vol.Required(
155  CONF_MODE, default=InstanceMode.CLOUD
156  ): selector.SelectSelector(
157  selector.SelectSelectorConfig(
158  options=list(InstanceMode),
159  translation_key="installation_mode",
160  mode=selector.SelectSelectorMode.DROPDOWN,
161  )
162  )
163  }
164  ),
165  last_step=False,
166  )
167 
168  async def async_step_auth(
169  self, user_input: dict[str, Any] | None = None
170  ) -> ConfigFlowResult:
171  """Handle the auth step."""
172  errors = {}
173 
174  if user_input:
175  self._async_abort_entries_match_async_abort_entries_match({CONF_USERNAME: user_input[CONF_USERNAME]})
176 
177  errors = await _validate_input(self.hass, user_input)
178 
179  if not errors:
180  return self.async_create_entryasync_create_entryasync_create_entry(
181  title=user_input[CONF_USERNAME], data=user_input
182  )
183 
184  schema: VolDictType = {
185  vol.Required(CONF_USERNAME): selector.TextSelector(
186  selector.TextSelectorConfig(type=selector.TextSelectorType.TEXT)
187  ),
188  vol.Required(CONF_PASSWORD): selector.TextSelector(
189  selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD)
190  ),
191  vol.Required(CONF_COUNTRY): selector.CountrySelector(),
192  }
193  if self._mode_mode == InstanceMode.SELF_HOSTED:
194  schema.update(
195  {
196  vol.Required(CONF_OVERRIDE_REST_URL): selector.TextSelector(
197  selector.TextSelectorConfig(type=selector.TextSelectorType.URL)
198  ),
199  vol.Required(CONF_OVERRIDE_MQTT_URL): selector.TextSelector(
200  selector.TextSelectorConfig(type=selector.TextSelectorType.URL)
201  ),
202  }
203  )
204  if errors:
205  schema[vol.Optional(CONF_VERIFY_MQTT_CERTIFICATE, default=True)] = bool
206 
207  if not user_input:
208  user_input = {
209  CONF_COUNTRY: self.hass.config.country,
210  }
211 
212  return self.async_show_formasync_show_formasync_show_form(
213  step_id="auth",
214  data_schema=self.add_suggested_values_to_schemaadd_suggested_values_to_schema(
215  data_schema=vol.Schema(schema), suggested_values=user_input
216  ),
217  errors=errors,
218  last_step=True,
219  )
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:140
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
bool show_advanced_options(self)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, str] _validate_url(str value, str field_name, set[str] schema_list)
Definition: config_flow.py:42
dict[str, str] _validate_input(HomeAssistant hass, dict[str, Any] user_input)
Definition: config_flow.py:55
str get_client_device_id(HomeAssistant hass, bool self_hosted)
Definition: util.py:23
ssl.SSLContext get_default_no_verify_context()
Definition: ssl.py:123