1 """Config flow for FireServiceRota."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 from pyfireservicerota
import FireServiceRota, InvalidAuthError
9 import voluptuous
as vol
14 from .const
import DOMAIN, URL_LIST
16 DATA_SCHEMA = vol.Schema(
18 vol.Required(CONF_URL, default=
"www.brandweerrooster.nl"): vol.In(URL_LIST),
19 vol.Required(CONF_USERNAME): str,
20 vol.Required(CONF_PASSWORD): str,
26 """Handle a FireServiceRota config flow."""
31 """Initialize config flow."""
40 self, user_input: dict[str, Any] |
None =
None
41 ) -> ConfigFlowResult:
42 """Handle a flow initiated by the user."""
43 errors: dict[str, str] = {}
45 if user_input
is None:
51 """Check if config is valid and create entry if so."""
52 self.
_password_password = user_input[CONF_PASSWORD]
54 extra_inputs = user_input
59 self.
_username_username = extra_inputs[CONF_USERNAME]
60 self.
_base_url_base_url = extra_inputs[CONF_URL]
66 self.
apiapi = FireServiceRota(
73 token_info = await self.hass.async_add_executor_job(self.
apiapi.request_tokens)
74 except InvalidAuthError:
78 data_schema=DATA_SCHEMA,
79 errors={
"base":
"invalid_auth"},
83 "auth_implementation": DOMAIN,
86 CONF_TOKEN: token_info,
93 self.hass.config_entries.async_update_entry(entry, data=data)
94 await self.hass.config_entries.async_reload(entry.entry_id)
98 """Show the setup form to the user."""
100 if user_input
is None:
103 if step_id ==
"user":
105 vol.Required(CONF_URL, default=
"www.brandweerrooster.nl"): vol.In(
108 vol.Required(CONF_USERNAME): str,
109 vol.Required(CONF_PASSWORD): str,
112 schema = {vol.Required(CONF_PASSWORD): str}
116 data_schema=vol.Schema(schema),
122 self, entry_data: Mapping[str, Any]
123 ) -> ConfigFlowResult:
124 """Initialise re-authentication."""
131 self, user_input: dict[str, str] |
None =
None
132 ) -> ConfigFlowResult:
133 """Get new tokens for a config entry that can't authenticate."""
134 if user_input
is None:
_description_placeholders
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
def _show_setup_form(self, user_input=None, errors=None, step_id="user")
def _validate_and_create_entry(self, user_input, step_id)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, str]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)