1 """Config flow for Sensoterra integration."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
8 from jwt
import DecodeError, decode
9 from sensoterra.customerapi
import (
11 InvalidAuth
as StInvalidAuth,
14 import voluptuous
as vol
24 from .const
import DOMAIN, LOGGER, TOKEN_EXPIRATION_DAYS
26 STEP_USER_DATA_SCHEMA = vol.Schema(
39 """Handle a config flow for Sensoterra."""
42 self, user_input: dict[str, Any] |
None =
None
43 ) -> ConfigFlowResult:
44 """Create hub entry based on config flow."""
45 errors: dict[str, str] = {}
47 if user_input
is not None:
48 api = CustomerApi(user_input[CONF_EMAIL], user_input[CONF_PASSWORD])
50 uuid = self.hass.data[
"core.uuid"]
51 expiration = datetime.now() +
timedelta(TOKEN_EXPIRATION_DAYS)
54 token: str = await api.get_token(
55 f
"Home Assistant {uuid}",
"READONLY", expiration
57 decoded_token = decode(
58 token, algorithms=[
"HS256"], options={
"verify_signature":
False}
61 except StInvalidAuth
as exp:
63 "Login attempt with %s: %s", user_input[CONF_EMAIL], exp.message
65 errors[
"base"] =
"invalid_auth"
67 LOGGER.error(
"Login attempt with %s: time out", user_input[CONF_EMAIL])
68 errors[
"base"] =
"cannot_connect"
70 LOGGER.error(
"Login attempt with %s: bad token", user_input[CONF_EMAIL])
71 errors[
"base"] =
"invalid_access_token"
73 device_unique_id = decoded_token[
"sub"]
77 title=user_input[CONF_EMAIL],
80 CONF_EMAIL: user_input[CONF_EMAIL],
87 STEP_USER_DATA_SCHEMA, user_input
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)