1 """Config flow for Canary."""
3 from __future__
import annotations
6 from typing
import Any, Final
8 from canary.api
import Api
9 from requests.exceptions
import ConnectTimeout, HTTPError
10 import voluptuous
as vol
22 CONF_FFMPEG_ARGUMENTS,
23 DEFAULT_FFMPEG_ARGUMENTS,
28 _LOGGER: Final = logging.getLogger(__name__)
32 """Validate the user input allows us to connect.
34 Data has the keys from DATA_SCHEMA with values provided by the user.
40 data.get(CONF_TIMEOUT, DEFAULT_TIMEOUT),
47 """Handle a config flow for Canary."""
54 """Get the options flow for this handler."""
58 """Handle a flow initiated by configuration file."""
62 self, user_input: dict[str, Any] |
None =
None
63 ) -> ConfigFlowResult:
64 """Handle a flow initiated by the user."""
68 if user_input
is not None:
69 if CONF_TIMEOUT
not in user_input:
70 user_input[CONF_TIMEOUT] = DEFAULT_TIMEOUT
72 default_username = user_input[CONF_USERNAME]
75 await self.hass.async_add_executor_job(
76 validate_input, self.hass, user_input
78 except (ConnectTimeout, HTTPError):
79 errors[
"base"] =
"cannot_connect"
81 _LOGGER.exception(
"Unexpected exception")
85 title=user_input[CONF_USERNAME],
90 vol.Required(CONF_USERNAME, default=default_username): str,
91 vol.Required(CONF_PASSWORD): str,
96 data_schema=vol.Schema(data_schema),
102 """Handle Canary client options."""
105 self, user_input: dict[str, Any] |
None =
None
106 ) -> ConfigFlowResult:
107 """Manage Canary options."""
108 if user_input
is not None:
113 CONF_FFMPEG_ARGUMENTS,
115 CONF_FFMPEG_ARGUMENTS, DEFAULT_FFMPEG_ARGUMENTS
124 return self.
async_show_formasync_show_form(step_id=
"init", data_schema=vol.Schema(options))
OptionsFlow async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
bool validate_input(HomeAssistant hass, dict[str, Any] data)