1 """Auth providers for Home Assistant."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
10 import voluptuous
as vol
11 from voluptuous.humanize
import humanize_error
13 from homeassistant
import requirements
23 from ..auth_store
import AuthStore
24 from ..const
import MFA_SESSION_EXPIRATION
25 from ..models
import (
34 _LOGGER = logging.getLogger(__name__)
35 DATA_REQS: HassKey[set[str]] =
HassKey(
"auth_prov_reqs_processed")
37 AUTH_PROVIDERS: Registry[str, type[AuthProvider]] =
Registry()
39 AUTH_PROVIDER_SCHEMA = vol.Schema(
41 vol.Required(CONF_TYPE): str,
42 vol.Optional(CONF_NAME): str,
44 vol.Optional(CONF_ID): str,
46 extra=vol.ALLOW_EXTRA,
51 """Provider of user authentication."""
53 DEFAULT_TITLE =
"Unnamed auth provider"
56 self, hass: HomeAssistant, store: AuthStore, config: dict[str, Any]
58 """Initialize an auth provider."""
64 def id(self) -> str | None:
65 """Return id of the auth provider.
67 Optional, can be None.
73 """Return type of the provider."""
74 return self.
configconfig[CONF_TYPE]
78 """Return the name of the auth provider."""
83 """Return whether multi-factor auth supported by the auth provider."""
87 """Return all credentials of this provider."""
92 for credentials
in user.credentials
94 credentials.auth_provider_type == self.
typetype
95 and credentials.auth_provider_id == self.
idid
101 """Create credentials."""
103 auth_provider_type=self.
typetype, auth_provider_id=self.
idid, data=data
109 """Return the data flow for logging in with auth provider.
111 Auth provider should extend LoginFlow and return an instance.
113 raise NotImplementedError
116 self, flow_result: Mapping[str, str]
118 """Get credentials based on the flow result."""
119 raise NotImplementedError
122 self, credentials: Credentials
124 """Return extra user metadata for credentials.
126 Will be used to populate info when creating a new user.
128 raise NotImplementedError
131 """Initialize the auth provider."""
135 self, refresh_token: RefreshToken, remote_ip: str |
None =
None
137 """Verify a refresh token is still valid.
139 Optional hook for an auth provider to verify validity of a refresh token.
140 Should raise InvalidAuthError on errors.
145 hass: HomeAssistant, store: AuthStore, config: dict[str, Any]
147 """Initialize an auth provider from a config."""
148 provider_name: str = config[CONF_TYPE]
152 config = module.CONFIG_SCHEMA(config)
153 except vol.Invalid
as err:
155 "Invalid configuration for auth provider %s: %s",
161 return AUTH_PROVIDERS[provider_name](hass, store, config)
165 hass: HomeAssistant, provider: str
166 ) -> types.ModuleType:
167 """Load an auth provider."""
170 hass, f
"homeassistant.auth.providers.{provider}"
172 except ImportError
as err:
173 _LOGGER.error(
"Unable to load auth provider %s: %s", provider, err)
175 f
"Unable to load auth provider {provider}: {err}"
178 if hass.config.skip_pip
or not hasattr(module,
"REQUIREMENTS"):
181 if (processed := hass.data.get(DATA_REQS))
is None:
182 processed = hass.data[DATA_REQS] = set()
183 elif provider
in processed:
186 reqs = module.REQUIREMENTS
187 await requirements.async_process_requirements(
188 hass, f
"auth provider {provider}", reqs
191 processed.add(provider)
196 """Handler for the login flow."""
198 _flow_result = AuthFlowResult
200 def __init__(self, auth_provider: AuthProvider) ->
None:
201 """Initialize the login flow."""
205 self.available_mfa_modules: dict[str, str] = {}
208 self.user: User |
None =
None
209 self.credential: Credentials |
None =
None
212 self, user_input: dict[str, str] |
None =
None
214 """Handle the first step of login flow.
216 Return self.async_show_form(step_id='init') if user_input is None.
217 Return await self.async_finish(flow_result) if login init step pass.
219 raise NotImplementedError
222 self, user_input: dict[str, str] |
None =
None
224 """Handle the step of select mfa module."""
227 if user_input
is not None:
228 auth_module = user_input.get(
"multi_factor_auth_module")
229 if auth_module
in self.available_mfa_modules:
232 errors[
"base"] =
"invalid_auth_module"
234 if len(self.available_mfa_modules) == 1:
239 step_id=
"select_mfa_module",
240 data_schema=vol.Schema(
241 {
"multi_factor_auth_module": vol.In(self.available_mfa_modules)}
247 self, user_input: dict[str, str] |
None =
None
249 """Handle the step of mfa validation."""
250 assert self.credential
257 if auth_module
is None:
262 if user_input
is None and hasattr(
263 auth_module,
"async_initialize_login_mfa_step"
266 await auth_module.async_initialize_login_mfa_step(self.user.id)
267 except HomeAssistantError:
268 _LOGGER.exception(
"Error initializing MFA step")
269 return self.
async_abortasync_abort(reason=
"unknown_error")
271 if user_input
is not None:
272 expires = self.
created_atcreated_at + MFA_SESSION_EXPIRATION
273 if dt_util.utcnow() > expires:
274 return self.
async_abortasync_abort(reason=
"login_expired")
276 result = await auth_module.async_validate(self.user.id, user_input)
278 errors[
"base"] =
"invalid_code"
281 return self.
async_abortasync_abort(reason=
"too_many_retry")
284 return await self.
async_finishasync_finish(self.credential)
286 description_placeholders: dict[str, str] = {
287 "mfa_module_name": auth_module.name,
288 "mfa_module_id": auth_module.id,
293 data_schema=auth_module.input_schema,
294 description_placeholders=description_placeholders,
299 """Handle the pass of login flow."""
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
None __init__(self, HomeAssistant hass, AuthStore store, dict[str, Any] config)
None async_validate_refresh_token(self, RefreshToken refresh_token, str|None remote_ip=None)
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
list[Credentials] async_credentials(self)
Credentials async_create_credentials(self, dict[str, str] data)
None async_initialize(self)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
AuthFlowResult async_step_select_mfa_module(self, dict[str, str]|None user_input=None)
None __init__(self, AuthProvider auth_provider)
AuthFlowResult async_finish(self, Any flow_result)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
AuthFlowResult async_step_mfa(self, dict[str, str]|None user_input=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
AuthProvider auth_provider_from_config(HomeAssistant hass, AuthStore store, dict[str, Any] config)
types.ModuleType load_auth_provider_module(HomeAssistant hass, str provider)
web.Response get(self, web.Request request, str config_key)
list[str] async_get_users(HomeAssistant hass)
str humanize_error(HomeAssistant hass, vol.Invalid validation_error, str domain, dict config, str|None link, int max_sub_error_length=MAX_VALIDATION_ERROR_ITEM_LENGTH)
ModuleType async_import_module(HomeAssistant hass, str name)