Home Assistant Unofficial Reference 2024.12.1
homeassistant.py
Go to the documentation of this file.
1 """Home Assistant auth provider."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import base64
7 from collections.abc import Mapping
8 import logging
9 from typing import Any, cast
10 
11 import bcrypt
12 import voluptuous as vol
13 
14 from homeassistant.const import CONF_ID
15 from homeassistant.core import HomeAssistant, callback
16 from homeassistant.exceptions import HomeAssistantError
17 from homeassistant.helpers import issue_registry as ir
18 from homeassistant.helpers.storage import Store
19 
20 from ..models import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
21 from . import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
22 
23 STORAGE_VERSION = 1
24 STORAGE_KEY = "auth_provider.homeassistant"
25 
26 
27 def _disallow_id(conf: dict[str, Any]) -> dict[str, Any]:
28  """Disallow ID in config."""
29  if CONF_ID in conf:
30  raise vol.Invalid("ID is not allowed for the homeassistant auth provider.")
31 
32  return conf
33 
34 
35 CONFIG_SCHEMA = vol.All(AUTH_PROVIDER_SCHEMA, _disallow_id)
36 
37 
38 @callback
39 def async_get_provider(hass: HomeAssistant) -> HassAuthProvider:
40  """Get the provider."""
41  for prv in hass.auth.auth_providers:
42  if prv.type == "homeassistant":
43  return cast(HassAuthProvider, prv)
44 
45  raise RuntimeError("Provider not found")
46 
47 
49  """Raised when we encounter invalid authentication."""
50 
51 
53  """Raised when invalid user is specified.
54 
55  Will not be raised when validating authentication.
56  """
57 
58  def __init__(
59  self,
60  *args: object,
61  translation_key: str | None = None,
62  translation_placeholders: dict[str, str] | None = None,
63  ) -> None:
64  """Initialize exception."""
65  super().__init__(
66  *args,
67  translation_domain="auth",
68  translation_key=translation_key,
69  translation_placeholders=translation_placeholders,
70  )
71 
72 
74  """Raised when invalid username is specified.
75 
76  Will not be raised when validating authentication.
77  """
78 
79 
80 class Data:
81  """Hold the user data."""
82 
83  def __init__(self, hass: HomeAssistant) -> None:
84  """Initialize the user data store."""
85  self.hasshass = hass
86  self._store_store = Store[dict[str, list[dict[str, str]]]](
87  hass, STORAGE_VERSION, STORAGE_KEY, private=True, atomic_writes=True
88  )
89  self._data_data: dict[str, list[dict[str, str]]] | None = None
90  # Legacy mode will allow usernames to start/end with whitespace
91  # and will compare usernames case-insensitive.
92  # Deprecated in June 2019 and will be removed in 2026.7
93  self.is_legacyis_legacy = False
94 
95  @callback
97  self, username: str, *, force_normalize: bool = False
98  ) -> str:
99  """Normalize a username based on the mode."""
100  if self.is_legacyis_legacy and not force_normalize:
101  return username
102 
103  return username.strip().casefold()
104 
105  async def async_load(self) -> None:
106  """Load stored data."""
107  if (data := await self._store_store.async_load()) is None:
108  data = cast(dict[str, list[dict[str, str]]], {"users": []})
109 
110  self._async_check_for_not_normalized_usernames_async_check_for_not_normalized_usernames(data)
111  self._data_data = data
112 
113  @callback
115  self, data: dict[str, list[dict[str, str]]]
116  ) -> None:
117  not_normalized_usernames: set[str] = set()
118 
119  for user in data["users"]:
120  username = user["username"]
121 
122  if self.normalize_usernamenormalize_username(username, force_normalize=True) != username:
123  logging.getLogger(__name__).warning(
124  (
125  "Home Assistant auth provider is running in legacy mode "
126  "because we detected usernames that are normalized (lowercase and without spaces)."
127  " Please change the username: '%s'."
128  ),
129  username,
130  )
131  not_normalized_usernames.add(username)
132 
133  if not_normalized_usernames:
134  self.is_legacyis_legacy = True
135  ir.async_create_issue(
136  self.hasshass,
137  "auth",
138  "homeassistant_provider_not_normalized_usernames",
139  breaks_in_ha_version="2026.7.0",
140  is_fixable=False,
141  severity=ir.IssueSeverity.WARNING,
142  translation_key="homeassistant_provider_not_normalized_usernames",
143  translation_placeholders={
144  "usernames": f'- "{'"\n- "'.join(sorted(not_normalized_usernames))}"'
145  },
146  learn_more_url="homeassistant://config/users",
147  )
148  else:
149  self.is_legacyis_legacy = False
150  ir.async_delete_issue(
151  self.hasshass, "auth", "homeassistant_provider_not_normalized_usernames"
152  )
153 
154  @property
155  def users(self) -> list[dict[str, str]]:
156  """Return users."""
157  assert self._data_data is not None
158  return self._data_data["users"]
159 
160  def validate_login(self, username: str, password: str) -> None:
161  """Validate a username and password.
162 
163  Raises InvalidAuth if auth invalid.
164  """
165  username = self.normalize_usernamenormalize_username(username)
166  dummy = b"$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO"
167  found = None
168 
169  # Compare all users to avoid timing attacks.
170  for user in self.usersusers:
171  if self.normalize_usernamenormalize_username(user["username"]) == username:
172  found = user
173 
174  if found is None:
175  # check a hash to make timing the same as if user was found
176  bcrypt.checkpw(b"foo", dummy)
177  raise InvalidAuth
178 
179  user_hash = base64.b64decode(found["password"])
180 
181  # bcrypt.checkpw is timing-safe
182  if not bcrypt.checkpw(password.encode(), user_hash):
183  raise InvalidAuth
184 
185  def hash_password(self, password: str, for_storage: bool = False) -> bytes:
186  """Encode a password."""
187  hashed: bytes = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12))
188 
189  if for_storage:
190  hashed = base64.b64encode(hashed)
191  return hashed
192 
193  def add_auth(self, username: str, password: str) -> None:
194  """Add a new authenticated user/pass.
195 
196  Raises InvalidUsername if the new username is invalid.
197  """
198  self._validate_new_username_validate_new_username(username)
199 
200  self.usersusers.append(
201  {
202  "username": username,
203  "password": self.hash_passwordhash_password(password, True).decode(),
204  }
205  )
206 
207  @callback
208  def async_remove_auth(self, username: str) -> None:
209  """Remove authentication."""
210  username = self.normalize_usernamenormalize_username(username)
211 
212  index = None
213  for i, user in enumerate(self.usersusers):
214  if self.normalize_usernamenormalize_username(user["username"]) == username:
215  index = i
216  break
217 
218  if index is None:
219  raise InvalidUser(translation_key="user_not_found")
220 
221  self.usersusers.pop(index)
222 
223  def change_password(self, username: str, new_password: str) -> None:
224  """Update the password.
225 
226  Raises InvalidUser if user cannot be found.
227  """
228  username = self.normalize_usernamenormalize_username(username)
229 
230  for user in self.usersusers:
231  if self.normalize_usernamenormalize_username(user["username"]) == username:
232  user["password"] = self.hash_passwordhash_password(new_password, True).decode()
233  break
234  else:
235  raise InvalidUser(translation_key="user_not_found")
236 
237  @callback
238  def _validate_new_username(self, new_username: str) -> None:
239  """Validate that username is normalized and unique.
240 
241  Raises InvalidUsername if the new username is invalid.
242  """
243  normalized_username = self.normalize_usernamenormalize_username(
244  new_username, force_normalize=True
245  )
246  if normalized_username != new_username:
247  raise InvalidUsername(
248  translation_key="username_not_normalized",
249  translation_placeholders={"new_username": new_username},
250  )
251 
252  if any(
253  self.normalize_usernamenormalize_username(user["username"]) == normalized_username
254  for user in self.usersusers
255  ):
256  raise InvalidUsername(
257  translation_key="username_already_exists",
258  translation_placeholders={"username": new_username},
259  )
260 
261  @callback
262  def change_username(self, username: str, new_username: str) -> None:
263  """Update the username.
264 
265  Raises InvalidUser if user cannot be found.
266  Raises InvalidUsername if the new username is invalid.
267  """
268  username = self.normalize_usernamenormalize_username(username)
269  self._validate_new_username_validate_new_username(new_username)
270 
271  for user in self.usersusers:
272  if self.normalize_usernamenormalize_username(user["username"]) == username:
273  user["username"] = new_username
274  assert self._data_data is not None
275  self._async_check_for_not_normalized_usernames_async_check_for_not_normalized_usernames(self._data_data)
276  break
277  else:
278  raise InvalidUser(translation_key="user_not_found")
279 
280  async def async_save(self) -> None:
281  """Save data."""
282  if self._data_data is not None:
283  await self._store_store.async_save(self._data_data)
284 
285 
286 @AUTH_PROVIDERS.register("homeassistant")
287 class HassAuthProvider(AuthProvider):
288  """Auth provider based on a local storage of users in Home Assistant config dir."""
289 
290  DEFAULT_TITLE = "Home Assistant Local"
291 
292  def __init__(self, *args: Any, **kwargs: Any) -> None:
293  """Initialize an Home Assistant auth provider."""
294  super().__init__(*args, **kwargs)
295  self.datadata: Data | None = None
296  self._init_lock_init_lock = asyncio.Lock()
297 
298  async def async_initialize(self) -> None:
299  """Initialize the auth provider."""
300  async with self._init_lock_init_lock:
301  if self.datadata is not None:
302  return
303 
304  data = Data(self.hass)
305  await data.async_load()
306  self.datadata = data
307 
308  async def async_login_flow(self, context: AuthFlowContext | None) -> LoginFlow:
309  """Return a flow to login."""
310  return HassLoginFlow(self)
311 
312  async def async_validate_login(self, username: str, password: str) -> None:
313  """Validate a username and password."""
314  if self.datadata is None:
315  await self.async_initializeasync_initialize()
316  assert self.datadata is not None
317 
318  await self.hass.async_add_executor_job(
319  self.datadata.validate_login, username, password
320  )
321 
322  async def async_add_auth(self, username: str, password: str) -> None:
323  """Call add_auth on data."""
324  if self.datadata is None:
325  await self.async_initializeasync_initialize()
326  assert self.datadata is not None
327 
328  await self.hass.async_add_executor_job(self.datadata.add_auth, username, password)
329  await self.datadata.async_save()
330 
331  async def async_remove_auth(self, username: str) -> None:
332  """Call remove_auth on data."""
333  if self.datadata is None:
334  await self.async_initializeasync_initialize()
335  assert self.datadata is not None
336 
337  self.datadata.async_remove_auth(username)
338  await self.datadata.async_save()
339 
340  async def async_change_password(self, username: str, new_password: str) -> None:
341  """Call change_password on data."""
342  if self.datadata is None:
343  await self.async_initializeasync_initialize()
344  assert self.datadata is not None
345 
346  await self.hass.async_add_executor_job(
347  self.datadata.change_password, username, new_password
348  )
349  await self.datadata.async_save()
350 
352  self, credential: Credentials, new_username: str
353  ) -> None:
354  """Validate new username and change it including updating credentials object."""
355  if self.datadata is None:
356  await self.async_initializeasync_initialize()
357  assert self.datadata is not None
358 
359  self.datadata.change_username(credential.data["username"], new_username)
360  self.hass.auth.async_update_user_credentials_data(
361  credential, {**credential.data, "username": new_username}
362  )
363  await self.datadata.async_save()
364 
366  self, flow_result: Mapping[str, str]
367  ) -> Credentials:
368  """Get credentials based on the flow result."""
369  if self.datadata is None:
370  await self.async_initializeasync_initialize()
371  assert self.datadata is not None
372 
373  norm_username = self.datadata.normalize_username
374  username = norm_username(flow_result["username"])
375 
376  for credential in await self.async_credentials():
377  if norm_username(credential.data["username"]) == username:
378  return credential
379 
380  # Create new credentials.
381  return self.async_create_credentials({"username": username})
382 
384  self, credentials: Credentials
385  ) -> UserMeta:
386  """Get extra info for this credential."""
387  return UserMeta(name=credentials.data["username"], is_active=True)
388 
389  async def async_will_remove_credentials(self, credentials: Credentials) -> None:
390  """When credentials get removed, also remove the auth."""
391  if self.datadata is None:
392  await self.async_initializeasync_initialize()
393  assert self.datadata is not None
394 
395  try:
396  self.datadata.async_remove_auth(credentials.data["username"])
397  await self.datadata.async_save()
398  except InvalidUser:
399  # Can happen if somehow we didn't clean up a credential
400  pass
401 
402 
403 class HassLoginFlow(LoginFlow):
404  """Handler for the login flow."""
405 
406  async def async_step_init(
407  self, user_input: dict[str, str] | None = None
408  ) -> AuthFlowResult:
409  """Handle the step of the form."""
410  errors = {}
411 
412  if user_input is not None:
413  try:
414  await cast(HassAuthProvider, self._auth_provider).async_validate_login(
415  user_input["username"], user_input["password"]
416  )
417  except InvalidAuth:
418  errors["base"] = "invalid_auth"
419 
420  if not errors:
421  user_input.pop("password")
422  return await self.async_finish(user_input)
423 
424  return self.async_show_form(
425  step_id="init",
426  data_schema=vol.Schema(
427  {
428  vol.Required("username"): str,
429  vol.Required("password"): str,
430  }
431  ),
432  errors=errors,
433  )
str normalize_username(self, str username, *bool force_normalize=False)
None validate_login(self, str username, str password)
None __init__(self, HomeAssistant hass)
None _async_check_for_not_normalized_usernames(self, dict[str, list[dict[str, str]]] data)
None change_username(self, str username, str new_username)
None add_auth(self, str username, str password)
None _validate_new_username(self, str new_username)
bytes hash_password(self, str password, bool for_storage=False)
None change_password(self, str username, str new_password)
None async_add_auth(self, str username, str password)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
None async_will_remove_credentials(self, Credentials credentials)
None async_change_password(self, str username, str new_password)
None async_validate_login(self, str username, str password)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
None async_change_username(self, Credentials credential, str new_username)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
None __init__(self, *object args, str|None translation_key=None, dict[str, str]|None translation_placeholders=None)
HassAuthProvider async_get_provider(HomeAssistant hass)
dict[str, Any] _disallow_id(dict[str, Any] conf)
None async_save(self, _T data)
Definition: storage.py:424