Home Assistant Unofficial Reference 2024.12.1
insecure_example.py
Go to the documentation of this file.
1 """Example auth provider."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import hmac
7 from typing import cast
8 
9 import voluptuous as vol
10 
11 from homeassistant.core import callback
12 from homeassistant.exceptions import HomeAssistantError
13 
14 from ..models import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
15 from . import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
16 
17 USER_SCHEMA = vol.Schema(
18  {
19  vol.Required("username"): str,
20  vol.Required("password"): str,
21  vol.Optional("name"): str,
22  }
23 )
24 
25 
26 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
27  {vol.Required("users"): [USER_SCHEMA]}, extra=vol.PREVENT_EXTRA
28 )
29 
30 
32  """Raised when submitting invalid authentication."""
33 
34 
35 @AUTH_PROVIDERS.register("insecure_example")
36 class ExampleAuthProvider(AuthProvider):
37  """Example auth provider based on hardcoded usernames and passwords."""
38 
39  async def async_login_flow(self, context: AuthFlowContext | None) -> LoginFlow:
40  """Return a flow to login."""
41  return ExampleLoginFlow(self)
42 
43  @callback
44  def async_validate_login(self, username: str, password: str) -> None:
45  """Validate a username and password."""
46  user = None
47 
48  # Compare all users to avoid timing attacks.
49  for usr in self.config["users"]:
50  if hmac.compare_digest(
51  username.encode("utf-8"), usr["username"].encode("utf-8")
52  ):
53  user = usr
54 
55  if user is None:
56  # Do one more compare to make timing the same as if user was found.
57  hmac.compare_digest(password.encode("utf-8"), password.encode("utf-8"))
58  raise InvalidAuthError
59 
60  if not hmac.compare_digest(
61  user["password"].encode("utf-8"), password.encode("utf-8")
62  ):
63  raise InvalidAuthError
64 
65  async def async_get_or_create_credentials(
66  self, flow_result: Mapping[str, str]
67  ) -> Credentials:
68  """Get credentials based on the flow result."""
69  username = flow_result["username"]
70 
71  for credential in await self.async_credentials():
72  if credential.data["username"] == username:
73  return credential
74 
75  # Create new credentials.
76  return self.async_create_credentials({"username": username})
77 
78  async def async_user_meta_for_credentials(
79  self, credentials: Credentials
80  ) -> UserMeta:
81  """Return extra user metadata for credentials.
82 
83  Will be used to populate info when creating a new user.
84  """
85  username = credentials.data["username"]
86  name = None
87 
88  for user in self.config["users"]:
89  if user["username"] == username:
90  name = user.get("name")
91  break
92 
93  return UserMeta(name=name, is_active=True)
94 
95 
96 class ExampleLoginFlow(LoginFlow):
97  """Handler for the login flow."""
98 
99  async def async_step_init(
100  self, user_input: dict[str, str] | None = None
101  ) -> AuthFlowResult:
102  """Handle the step of the form."""
103  errors = None
104 
105  if user_input is not None:
106  try:
107  cast(ExampleAuthProvider, self._auth_provider).async_validate_login(
108  user_input["username"], user_input["password"]
109  )
110  except InvalidAuthError:
111  errors = {"base": "invalid_auth"}
112 
113  if not errors:
114  user_input.pop("password")
115  return await self.async_finish(user_input)
116 
117  return self.async_show_form(
118  step_id="init",
119  data_schema=vol.Schema(
120  {
121  vol.Required("username"): str,
122  vol.Required("password"): str,
123  }
124  ),
125  errors=errors,
126  )
LoginFlow async_login_flow(self, AuthFlowContext|None context)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)