1 """Config flow for the Abode Security System component."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
6 from http
import HTTPStatus
7 from typing
import Any, cast
9 from jaraco.abode.client
import Client
as Abode
10 from jaraco.abode.exceptions
import (
11 AuthenticationException
as AbodeAuthenticationException,
12 Exception
as AbodeException,
14 from jaraco.abode.helpers.errors
import MFA_CODE_REQUIRED
15 from requests.exceptions
import ConnectTimeout, HTTPError
16 import voluptuous
as vol
21 from .const
import CONF_POLLING, DOMAIN, LOGGER
27 """Config flow for Abode."""
34 vol.Required(CONF_USERNAME): str,
35 vol.Required(CONF_PASSWORD): str,
38 vol.Required(CONF_MFA): str,
41 self.
_mfa_code_mfa_code: str |
None =
None
42 self.
_password_password: str |
None =
None
43 self._polling: bool =
False
44 self.
_username_username: str |
None =
None
47 """Handle login with Abode."""
51 await self.hass.async_add_executor_job(
55 except AbodeException
as ex:
56 if ex.errcode == MFA_CODE_REQUIRED[0]:
59 LOGGER.error(
"Unable to connect to Abode: %s", ex)
61 if ex.errcode == HTTPStatus.BAD_REQUEST:
62 errors = {
"base":
"invalid_auth"}
65 errors = {
"base":
"cannot_connect"}
67 except (ConnectTimeout, HTTPError):
68 errors = {
"base":
"cannot_connect"}
72 step_id=step_id, data_schema=vol.Schema(self.
data_schemadata_schema), errors=errors
78 """Handle multi-factor authentication (MFA) login with Abode."""
81 abode = Abode(auto_login=
False, get_devices=
False, get_automations=
False)
82 await self.hass.async_add_executor_job(
86 except AbodeAuthenticationException:
90 errors={
"base":
"invalid_mfa_code"},
96 """Create the config entry."""
100 CONF_POLLING: self._polling,
108 title=cast(str, self.
_username_username), data=config_data
112 self, user_input: dict[str, Any] |
None =
None
113 ) -> ConfigFlowResult:
114 """Handle a flow initialized by the user."""
115 if user_input
is None:
117 step_id=
"user", data_schema=vol.Schema(self.
data_schemadata_schema)
126 self, user_input: dict[str, Any] |
None =
None
127 ) -> ConfigFlowResult:
128 """Handle a multi-factor authentication (MFA) flow."""
129 if user_input
is None:
131 step_id=
"mfa", data_schema=vol.Schema(self.
mfa_data_schemamfa_data_schema)
139 self, entry_data: Mapping[str, Any]
140 ) -> ConfigFlowResult:
141 """Handle reauthorization request from Abode."""
142 self.
_username_username = entry_data[CONF_USERNAME]
147 self, user_input: dict[str, Any] |
None =
None
148 ) -> ConfigFlowResult:
149 """Handle reauthorization flow."""
150 if user_input
is None:
152 step_id=
"reauth_confirm",
153 data_schema=vol.Schema(
155 vol.Required(CONF_USERNAME, default=self.
_username_username): str,
156 vol.Required(CONF_PASSWORD): str,
161 self.
_username_username = user_input[CONF_USERNAME]
162 self.
_password_password = user_input[CONF_PASSWORD]
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _async_abode_login(self, str step_id)
ConfigFlowResult _async_abode_mfa_login(self)
ConfigFlowResult async_step_mfa(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _async_create_entry(self)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)