1 """Example auth provider."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
7 from typing
import cast
9 import voluptuous
as vol
14 from ..models
import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
15 from .
import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
17 USER_SCHEMA = vol.Schema(
19 vol.Required(
"username"): str,
20 vol.Required(
"password"): str,
21 vol.Optional(
"name"): str,
26 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
27 {vol.Required(
"users"): [USER_SCHEMA]}, extra=vol.PREVENT_EXTRA
32 """Raised when submitting invalid authentication."""
35 @AUTH_PROVIDERS.register(
"insecure_example")
37 """Example auth provider based on hardcoded usernames and passwords."""
40 """Return a flow to login."""
44 def async_validate_login(self, username: str, password: str) ->
None:
45 """Validate a username and password."""
49 for usr
in self.config[
"users"]:
50 if hmac.compare_digest(
51 username.encode(
"utf-8"), usr[
"username"].encode(
"utf-8")
57 hmac.compare_digest(password.encode(
"utf-8"), password.encode(
"utf-8"))
58 raise InvalidAuthError
60 if not hmac.compare_digest(
61 user[
"password"].encode(
"utf-8"), password.encode(
"utf-8")
63 raise InvalidAuthError
65 async
def async_get_or_create_credentials(
66 self, flow_result: Mapping[str, str]
68 """Get credentials based on the flow result."""
69 username = flow_result[
"username"]
71 for credential
in await self.async_credentials():
72 if credential.data[
"username"] == username:
76 return self.async_create_credentials({
"username": username})
78 async
def async_user_meta_for_credentials(
79 self, credentials: Credentials
81 """Return extra user metadata for credentials.
83 Will be used to populate info when creating a new user.
85 username = credentials.data[
"username"]
88 for user
in self.config[
"users"]:
89 if user[
"username"] == username:
90 name = user.get(
"name")
93 return UserMeta(name=name, is_active=
True)
97 """Handler for the login flow."""
100 self, user_input: dict[str, str] |
None =
None
102 """Handle the step of the form."""
105 if user_input
is not None:
107 cast(ExampleAuthProvider, self._auth_provider).async_validate_login(
108 user_input[
"username"], user_input[
"password"]
110 except InvalidAuthError:
111 errors = {
"base":
"invalid_auth"}
114 user_input.pop(
"password")
115 return await self.async_finish(user_input)
117 return self.async_show_form(
119 data_schema=vol.Schema(
121 vol.Required(
"username"): str,
122 vol.Required(
"password"): str,
LoginFlow async_login_flow(self, AuthFlowContext|None context)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)