1 """Config flow for Tellduslive."""
8 from tellduslive
import Session, supports_local_api
9 import voluptuous
as vol
28 KEY_TOKEN_SECRET =
"token_secret"
30 _LOGGER = logging.getLogger(__name__)
34 """Handle a config flow."""
41 """Init config flow."""
48 public_key=PUBLIC_KEY,
49 private_key=NOT_SO_PRIVATE_KEY,
51 application=APPLICATION_NAME,
53 return self.
_session_session.authorize_url
56 self, user_input: dict[str, Any] |
None =
None
57 ) -> ConfigFlowResult:
58 """Let user select host or cloud."""
62 if user_input
is not None or len(self.
_hosts_hosts) == 1:
63 if user_input
is not None and user_input[CONF_HOST] != CLOUD_NAME:
64 self.
_host_host = user_input[CONF_HOST]
69 data_schema=vol.Schema(
70 {vol.Required(CONF_HOST): vol.In(
list(self.
_hosts_hosts))}
75 self, user_input: dict[str, Any] |
None =
None
76 ) -> ConfigFlowResult:
77 """Handle the submitted configuration."""
79 if user_input
is not None:
80 if await self.hass.async_add_executor_job(self.
_session_session.authorize):
81 host = self.
_host_host
or CLOUD_NAME
83 session = {CONF_HOST: host, KEY_TOKEN: self.
_session_session.access_token}
86 KEY_TOKEN: self.
_session_session.access_token,
87 KEY_TOKEN_SECRET: self.
_session_session.access_token_secret,
93 KEY_SCAN_INTERVAL: self.
_scan_interval_scan_interval.total_seconds(),
97 errors[
"base"] =
"invalid_auth"
100 async
with asyncio.timeout(10):
101 auth_url = await self.hass.async_add_executor_job(self.
_get_auth_url_get_auth_url)
107 _LOGGER.exception(
"Unexpected error generating auth url")
110 _LOGGER.debug(
"Got authorization URL %s", auth_url)
114 description_placeholders={
115 "app_name": APPLICATION_NAME,
116 "auth_url": auth_url,
122 discovery_info: list[str],
123 ) -> ConfigFlowResult:
124 """Run when a Tellstick is discovered."""
127 _LOGGER.debug(
"Discovered tellstick device: %s", discovery_info)
128 if supports_local_api(discovery_info[1]):
129 _LOGGER.debug(
"%s support local API", discovery_info[1])
130 self.
_hosts_hosts.append(discovery_info[0])
135 """Import a config entry."""
139 self.
_scan_interval_scan_interval = import_data[KEY_SCAN_INTERVAL]
140 if import_data[CONF_HOST] != DOMAIN:
141 self.
_hosts_hosts.append(import_data[CONF_HOST])
143 if not await self.hass.async_add_executor_job(
144 os.path.isfile, self.hass.config.path(TELLDUS_CONFIG_FILE)
148 conf = await self.hass.async_add_executor_job(
149 load_json_object, self.hass.config.path(TELLDUS_CONFIG_FILE)
151 host = next(iter(conf))
153 if import_data[CONF_HOST] != host:
156 host = CLOUD_NAME
if host ==
"tellduslive" else host
161 KEY_SCAN_INTERVAL: self.
_scan_interval_scan_interval.total_seconds(),
162 KEY_SESSION: next(iter(conf.values())),
ConfigFlowResult async_step_auth(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_discovery(self, list[str] discovery_info)
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
str|None _get_auth_url(self)
None _async_handle_discovery_without_unique_id(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)