1 """Config flow for Ecovacs mqtt integration."""
3 from __future__
import annotations
5 from functools
import partial
9 from urllib.parse
import urlparse
11 from aiohttp
import ClientError
12 from deebot_client.authentication
import Authenticator, create_rest_config
13 from deebot_client.const
import UNDEFINED, UndefinedType
14 from deebot_client.exceptions
import InvalidAuthenticationError, MqttError
15 from deebot_client.mqtt_client
import MqttClient, create_mqtt_config
16 from deebot_client.util
import md5
17 import voluptuous
as vol
27 CONF_OVERRIDE_MQTT_URL,
28 CONF_OVERRIDE_REST_URL,
29 CONF_VERIFY_MQTT_CERTIFICATE,
33 from .util
import get_client_device_id
35 _LOGGER = logging.getLogger(__name__)
41 schema_list: set[str],
43 """Validate an URL and return error dictionary."""
44 if urlparse(value).scheme
not in schema_list:
45 return {field_name: f
"invalid_url_schema_{field_name}"}
47 vol.Schema(vol.Url())(value)
49 return {field_name:
"invalid_url"}
54 hass: HomeAssistant, user_input: dict[str, Any]
56 """Validate user input."""
57 errors: dict[str, str] = {}
59 if rest_url := user_input.get(CONF_OVERRIDE_REST_URL):
61 _validate_url(rest_url, CONF_OVERRIDE_REST_URL, {
"http",
"https"})
63 if mqtt_url := user_input.get(CONF_OVERRIDE_MQTT_URL):
65 _validate_url(mqtt_url, CONF_OVERRIDE_MQTT_URL, {
"mqtt",
"mqtts"})
72 country = user_input[CONF_COUNTRY]
73 rest_config = create_rest_config(
74 aiohttp_client.async_get_clientsession(hass),
76 alpha_2_country=country,
77 override_rest_url=rest_url,
80 authenticator = Authenticator(
82 user_input[CONF_USERNAME],
83 md5(user_input[CONF_PASSWORD]),
87 await authenticator.authenticate()
89 _LOGGER.debug(
"Cannot connect", exc_info=
True)
90 errors[
"base"] =
"cannot_connect"
91 except InvalidAuthenticationError:
92 errors[
"base"] =
"invalid_auth"
94 _LOGGER.exception(
"Unexpected exception during login")
95 errors[
"base"] =
"unknown"
100 ssl_context: UndefinedType | ssl.SSLContext = UNDEFINED
101 if not user_input.get(CONF_VERIFY_MQTT_CERTIFICATE,
True)
and mqtt_url:
104 mqtt_config = await hass.async_add_executor_job(
109 override_mqtt_url=mqtt_url,
110 ssl_context=ssl_context,
114 client = MqttClient(mqtt_config, authenticator)
115 cannot_connect_field = CONF_OVERRIDE_MQTT_URL
if mqtt_url
else "base"
118 await client.verify_config()
120 _LOGGER.debug(
"Cannot connect", exc_info=
True)
121 errors[cannot_connect_field] =
"cannot_connect"
122 except InvalidAuthenticationError:
123 errors[
"base"] =
"invalid_auth"
125 _LOGGER.exception(
"Unexpected exception during mqtt connection verification")
126 errors[
"base"] =
"unknown"
132 """Handle a config flow for Ecovacs."""
136 _mode: InstanceMode = InstanceMode.CLOUD
139 self, user_input: dict[str, Any] |
None =
None
140 ) -> ConfigFlowResult:
141 """Handle the initial step."""
144 return await self.async_step_auth()
147 self.
_mode_mode = user_input[CONF_MODE]
148 return await self.async_step_auth()
152 data_schema=vol.Schema(
155 CONF_MODE, default=InstanceMode.CLOUD
156 ): selector.SelectSelector(
157 selector.SelectSelectorConfig(
158 options=
list(InstanceMode),
159 translation_key=
"installation_mode",
160 mode=selector.SelectSelectorMode.DROPDOWN,
168 async
def async_step_auth(
169 self, user_input: dict[str, Any] |
None =
None
170 ) -> ConfigFlowResult:
171 """Handle the auth step."""
181 title=user_input[CONF_USERNAME], data=user_input
184 schema: VolDictType = {
185 vol.Required(CONF_USERNAME): selector.TextSelector(
186 selector.TextSelectorConfig(type=selector.TextSelectorType.TEXT)
188 vol.Required(CONF_PASSWORD): selector.TextSelector(
189 selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD)
191 vol.Required(CONF_COUNTRY): selector.CountrySelector(),
193 if self.
_mode_mode == InstanceMode.SELF_HOSTED:
196 vol.Required(CONF_OVERRIDE_REST_URL): selector.TextSelector(
197 selector.TextSelectorConfig(type=selector.TextSelectorType.URL)
199 vol.Required(CONF_OVERRIDE_MQTT_URL): selector.TextSelector(
200 selector.TextSelectorConfig(type=selector.TextSelectorType.URL)
205 schema[vol.Optional(CONF_VERIFY_MQTT_CERTIFICATE, default=
True)] = bool
209 CONF_COUNTRY: self.hass.config.country,
215 data_schema=vol.Schema(schema), suggested_values=user_input
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
bool show_advanced_options(self)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, str] _validate_url(str value, str field_name, set[str] schema_list)
dict[str, str] _validate_input(HomeAssistant hass, dict[str, Any] user_input)
str get_client_device_id(HomeAssistant hass, bool self_hosted)
ssl.SSLContext get_default_no_verify_context()