1 """Config flow for Tautulli."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 from pytautulli
import PyTautulli, PyTautulliException, exceptions
9 import voluptuous
as vol
15 from .const
import DEFAULT_NAME, DOMAIN
19 """Handle a config flow for Tautulli."""
24 self, user_input: dict[str, Any] |
None =
None
25 ) -> ConfigFlowResult:
26 """Handle a flow initiated by the user."""
28 if user_input
is not None:
30 if (error := await self.
validate_inputvalidate_input(user_input))
is None:
35 errors[
"base"] = error
37 user_input = user_input
or {}
39 vol.Required(CONF_API_KEY, default=user_input.get(CONF_API_KEY,
"")): str,
40 vol.Required(CONF_URL, default=user_input.get(CONF_URL,
"")): str,
42 CONF_VERIFY_SSL, default=user_input.get(CONF_VERIFY_SSL,
True)
48 data_schema=vol.Schema(data_schema),
53 self, entry_data: Mapping[str, Any]
54 ) -> ConfigFlowResult:
55 """Handle a reauthorization flow request."""
59 self, user_input: dict[str, str] |
None =
None
60 ) -> ConfigFlowResult:
61 """Confirm reauth dialog."""
63 if user_input
is not None:
65 _input = {**reauth_entry.data, CONF_API_KEY: user_input[CONF_API_KEY]}
66 if (error := await self.
validate_inputvalidate_input(_input))
is None:
68 errors[
"base"] = error
70 step_id=
"reauth_confirm",
71 data_schema=vol.Schema({vol.Required(CONF_API_KEY): str}),
76 """Try connecting to Tautulli."""
78 api_client = PyTautulli(
79 api_token=user_input[CONF_API_KEY],
80 url=user_input[CONF_URL],
82 self.hass, user_input.get(CONF_VERIFY_SSL,
True)
84 verify_ssl=user_input.get(CONF_VERIFY_SSL,
True),
86 await api_client.async_get_server_info()
87 except exceptions.PyTautulliConnectionException:
88 return "cannot_connect"
89 except exceptions.PyTautulliAuthenticationException:
91 except PyTautulliException:
ConfigFlowResult async_step_reauth_confirm(self, dict[str, str]|None user_input=None)
str|None validate_input(self, dict[str, Any] user_input)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigEntry _get_reauth_entry(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)