1 """Config flow for Electra Air Conditioner integration."""
3 from __future__
import annotations
8 from electrasmart.api
import STATUS_SUCCESS, Attributes, ElectraAPI, ElectraApiError
9 from electrasmart.api.utils
import generate_imei
10 import voluptuous
as vol
16 from .const
import CONF_IMEI, CONF_OTP, CONF_PHONE_NUMBER, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 """Handle a config flow for Electra Air Conditioner."""
27 """Device settings."""
30 self.
_otp_otp: str |
None =
None
31 self.
_imei_imei: str |
None =
None
32 self.
_token_token: str |
None =
None
33 self.
_api_api: ElectraAPI |
None =
None
36 self, user_input: dict[str, Any] |
None =
None
37 ) -> ConfigFlowResult:
38 """Handle the initial step."""
43 errors: dict[str, Any] = {}
45 if user_input
is None:
52 user_input: dict[str, str] |
None =
None,
53 errors: dict[str, str] |
None =
None,
54 step_id: str =
"user",
55 ) -> ConfigFlowResult:
56 """Show the setup form to the user."""
57 if user_input
is None:
63 CONF_PHONE_NUMBER, default=user_input.get(CONF_PHONE_NUMBER,
"")
67 schema = {vol.Required(CONF_OTP, default=user_input.get(CONF_OTP,
"")): str}
71 data_schema=vol.Schema(schema),
77 self, user_input: dict[str, str]
78 ) -> ConfigFlowResult:
79 """Check if config is valid and create entry if so."""
82 self.
_imei_imei = generate_imei()
89 assert isinstance(self.
_api_api, ElectraAPI)
93 except ElectraApiError
as exp:
94 _LOGGER.error(
"Failed to connect to API: %s", exp)
95 return self.
_show_setup_form_show_setup_form(user_input, {
"base":
"cannot_connect"},
"user")
97 if resp[Attributes.STATUS] == STATUS_SUCCESS:
98 if resp[Attributes.DATA][Attributes.RES] != STATUS_SUCCESS:
100 user_input, {CONF_PHONE_NUMBER:
"invalid_phone_number"},
"user"
106 self, user_input: dict[str, str]
107 ) -> ConfigFlowResult:
108 self.
_otp_otp = user_input[CONF_OTP]
110 assert isinstance(self.
_api_api, ElectraAPI)
111 assert isinstance(self.
_imei_imei, str)
113 assert isinstance(self.
_otp_otp, str)
116 resp = await self.
_api_api.validate_one_time_password(
119 except ElectraApiError
as exp:
120 _LOGGER.error(
"Failed to connect to API: %s", exp)
122 user_input, {
"base":
"cannot_connect"}, CONF_OTP
125 if resp[Attributes.DATA][Attributes.RES] == STATUS_SUCCESS:
126 self.
_token_token = resp[Attributes.DATA][Attributes.TOKEN]
129 CONF_TOKEN: self.
_token_token,
130 CONF_IMEI: self.
_imei_imei,
134 return self.
_show_setup_form_show_setup_form(user_input, {CONF_OTP:
"invalid_auth"}, CONF_OTP)
138 user_input: dict[str, Any] |
None =
None,
139 errors: dict[str, str] |
None =
None,
140 ) -> ConfigFlowResult:
141 """Ask the verification code to the user."""
145 if user_input
is None:
152 errors: dict[str, str] |
None =
None,
153 ) -> ConfigFlowResult:
154 """Show the verification_code form to the user."""
158 data_schema=vol.Schema({vol.Required(CONF_OTP): str}),
ConfigFlowResult _show_otp_form(self, dict[str, str]|None errors=None)
ConfigFlowResult _validate_phone_number(self, dict[str, str] user_input)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _show_setup_form(self, dict[str, str]|None user_input=None, dict[str, str]|None errors=None, str step_id="user")
ConfigFlowResult async_step_one_time_password(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
ConfigFlowResult _validate_one_time_password(self, dict[str, str] user_input)
_description_placeholders
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)