1 """Config flow for Transmission Bittorrent Client."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 import voluptuous
as vol
38 SUPPORTED_ORDER_MODES,
40 from .errors
import AuthenticationError, CannotConnect, UnknownError
42 DATA_SCHEMA = vol.Schema(
44 vol.Optional(CONF_SSL, default=DEFAULT_SSL): bool,
45 vol.Required(CONF_HOST): str,
46 vol.Required(CONF_PATH, default=DEFAULT_PATH): str,
47 vol.Optional(CONF_USERNAME): str,
48 vol.Optional(CONF_PASSWORD): str,
49 vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
55 """Handle Tansmission config flow."""
63 config_entry: ConfigEntry,
64 ) -> TransmissionOptionsFlowHandler:
65 """Get the options flow for this handler."""
69 self, user_input: dict[str, Any] |
None =
None
70 ) -> ConfigFlowResult:
71 """Handle a flow initialized by the user."""
74 if user_input
is not None:
76 {CONF_HOST: user_input[CONF_HOST], CONF_PORT: user_input[CONF_PORT]}
79 await
get_api(self.hass, user_input)
81 except AuthenticationError:
82 errors[CONF_USERNAME] =
"invalid_auth"
83 errors[CONF_PASSWORD] =
"invalid_auth"
84 except (CannotConnect, UnknownError):
85 errors[
"base"] =
"cannot_connect"
95 data_schema=DATA_SCHEMA,
100 self, entry_data: Mapping[str, Any]
101 ) -> ConfigFlowResult:
102 """Perform reauth upon an API authentication error."""
106 self, user_input: dict[str, str] |
None =
None
107 ) -> ConfigFlowResult:
108 """Confirm reauth dialog."""
111 if user_input
is not None:
112 user_input = {**reauth_entry.data, **user_input}
114 await
get_api(self.hass, user_input)
116 except AuthenticationError:
117 errors[CONF_PASSWORD] =
"invalid_auth"
118 except (CannotConnect, UnknownError):
119 errors[
"base"] =
"cannot_connect"
124 description_placeholders={
125 CONF_USERNAME: reauth_entry.data[CONF_USERNAME],
126 CONF_NAME: reauth_entry.title,
128 step_id=
"reauth_confirm",
129 data_schema=vol.Schema(
131 vol.Required(CONF_PASSWORD): str,
139 """Handle Transmission client options."""
142 self, user_input: dict[str, Any] |
None =
None
143 ) -> ConfigFlowResult:
144 """Manage the Transmission options."""
145 if user_input
is not None:
152 ): vol.All(vol.Coerce(int), vol.Range(min=1, max=500)),
156 ): vol.All(vol.Coerce(str), vol.In(SUPPORTED_ORDER_MODES.keys())),
159 return self.
async_show_formasync_show_form(step_id=
"init", data_schema=vol.Schema(options))
ConfigFlowResult async_step_reauth_confirm(self, dict[str, str]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
TransmissionOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigEntry _get_reauth_entry(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
transmission_rpc.Client get_api(HomeAssistant hass, dict[str, Any] entry)