1 """Config flow for edge integration."""
3 from __future__
import annotations
9 from Tami4EdgeAPI
import Tami4EdgeAPI, exceptions
10 import voluptuous
as vol
16 from .const
import CONF_PHONE, CONF_REFRESH_TOKEN, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
20 _STEP_PHONE_NUMBER_SCHEMA = vol.Schema({vol.Required(CONF_PHONE): cv.string})
22 _STEP_OTP_CODE_SCHEMA = vol.Schema({vol.Required(
"otp"): cv.string})
23 _PHONE_MATCHER = re.compile(
r"^(\+?972)?0?(?P<number>\d{8,9})$")
27 """Handle a config flow for Tami4Edge."""
34 self, user_input: dict[str, Any] |
None =
None
35 ) -> ConfigFlowResult:
36 """Handle the otp request step."""
38 if user_input
is not None:
39 phone = user_input[CONF_PHONE].strip()
42 if m := _PHONE_MATCHER.match(phone):
43 self.
phonephone = f
"+972{m.group('number')}"
45 raise InvalidPhoneNumber
46 await self.hass.async_add_executor_job(
47 Tami4EdgeAPI.request_otp, self.
phonephone
49 except InvalidPhoneNumber:
50 errors[
"base"] =
"invalid_phone"
51 except exceptions.Tami4EdgeAPIException:
52 errors[
"base"] =
"cannot_connect"
54 _LOGGER.exception(
"Unexpected exception")
55 errors[
"base"] =
"unknown"
60 step_id=
"user", data_schema=_STEP_PHONE_NUMBER_SCHEMA, errors=errors
64 self, user_input: dict[str, Any] |
None =
None
65 ) -> ConfigFlowResult:
66 """Handle the otp submission step."""
68 if user_input
is not None:
69 otp = user_input[
"otp"]
71 refresh_token = await self.hass.async_add_executor_job(
72 Tami4EdgeAPI.submit_otp, self.
phonephone, otp
74 api = await self.hass.async_add_executor_job(
75 Tami4EdgeAPI, refresh_token
77 except exceptions.OTPFailedException:
78 errors[
"base"] =
"invalid_auth"
79 except exceptions.Tami4EdgeAPIException:
80 errors[
"base"] =
"cannot_connect"
82 _LOGGER.exception(
"Unexpected exception")
83 errors[
"base"] =
"unknown"
85 device_name = api.device_metadata.name
86 if device_name
is None:
90 data={CONF_REFRESH_TOKEN: refresh_token},
94 step_id=
"otp", data_schema=_STEP_OTP_CODE_SCHEMA, errors=errors
99 """Error to indicate that the phone number is invalid."""
ConfigFlowResult async_step_otp(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)