1 """Home Assistant auth provider."""
3 from __future__
import annotations
7 from collections.abc
import Mapping
9 from typing
import Any, cast
12 import voluptuous
as vol
20 from ..models
import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
21 from .
import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
24 STORAGE_KEY =
"auth_provider.homeassistant"
28 """Disallow ID in config."""
30 raise vol.Invalid(
"ID is not allowed for the homeassistant auth provider.")
35 CONFIG_SCHEMA = vol.All(AUTH_PROVIDER_SCHEMA, _disallow_id)
40 """Get the provider."""
41 for prv
in hass.auth.auth_providers:
42 if prv.type ==
"homeassistant":
43 return cast(HassAuthProvider, prv)
45 raise RuntimeError(
"Provider not found")
49 """Raised when we encounter invalid authentication."""
53 """Raised when invalid user is specified.
55 Will not be raised when validating authentication.
61 translation_key: str |
None =
None,
62 translation_placeholders: dict[str, str] |
None =
None,
64 """Initialize exception."""
67 translation_domain=
"auth",
68 translation_key=translation_key,
69 translation_placeholders=translation_placeholders,
74 """Raised when invalid username is specified.
76 Will not be raised when validating authentication.
81 """Hold the user data."""
83 def __init__(self, hass: HomeAssistant) ->
None:
84 """Initialize the user data store."""
86 self.
_store_store = Store[dict[str, list[dict[str, str]]]](
87 hass, STORAGE_VERSION, STORAGE_KEY, private=
True, atomic_writes=
True
89 self.
_data_data: dict[str, list[dict[str, str]]] |
None =
None
97 self, username: str, *, force_normalize: bool =
False
99 """Normalize a username based on the mode."""
100 if self.
is_legacyis_legacy
and not force_normalize:
103 return username.strip().casefold()
106 """Load stored data."""
108 data = cast(dict[str, list[dict[str, str]]], {
"users": []})
115 self, data: dict[str, list[dict[str, str]]]
117 not_normalized_usernames: set[str] = set()
119 for user
in data[
"users"]:
120 username = user[
"username"]
122 if self.
normalize_usernamenormalize_username(username, force_normalize=
True) != username:
123 logging.getLogger(__name__).warning(
125 "Home Assistant auth provider is running in legacy mode "
126 "because we detected usernames that are normalized (lowercase and without spaces)."
127 " Please change the username: '%s'."
131 not_normalized_usernames.add(username)
133 if not_normalized_usernames:
135 ir.async_create_issue(
138 "homeassistant_provider_not_normalized_usernames",
139 breaks_in_ha_version=
"2026.7.0",
141 severity=ir.IssueSeverity.WARNING,
142 translation_key=
"homeassistant_provider_not_normalized_usernames",
143 translation_placeholders={
144 "usernames": f
'- "{'"\n- "'.join(sorted(not_normalized_usernames))}"'
146 learn_more_url=
"homeassistant://config/users",
150 ir.async_delete_issue(
151 self.
hasshass,
"auth",
"homeassistant_provider_not_normalized_usernames"
155 def users(self) -> list[dict[str, str]]:
157 assert self.
_data_data
is not None
158 return self.
_data_data[
"users"]
161 """Validate a username and password.
163 Raises InvalidAuth if auth invalid.
166 dummy = b
"$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO"
170 for user
in self.
usersusers:
176 bcrypt.checkpw(b
"foo", dummy)
179 user_hash = base64.b64decode(found[
"password"])
182 if not bcrypt.checkpw(password.encode(), user_hash):
186 """Encode a password."""
187 hashed: bytes = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12))
190 hashed = base64.b64encode(hashed)
193 def add_auth(self, username: str, password: str) ->
None:
194 """Add a new authenticated user/pass.
196 Raises InvalidUsername if the new username is invalid.
200 self.
usersusers.append(
202 "username": username,
203 "password": self.
hash_passwordhash_password(password,
True).decode(),
209 """Remove authentication."""
213 for i, user
in enumerate(self.
usersusers):
219 raise InvalidUser(translation_key=
"user_not_found")
221 self.
usersusers.pop(index)
224 """Update the password.
226 Raises InvalidUser if user cannot be found.
230 for user
in self.
usersusers:
232 user[
"password"] = self.
hash_passwordhash_password(new_password,
True).decode()
235 raise InvalidUser(translation_key=
"user_not_found")
239 """Validate that username is normalized and unique.
241 Raises InvalidUsername if the new username is invalid.
244 new_username, force_normalize=
True
246 if normalized_username != new_username:
248 translation_key=
"username_not_normalized",
249 translation_placeholders={
"new_username": new_username},
254 for user
in self.
usersusers
257 translation_key=
"username_already_exists",
258 translation_placeholders={
"username": new_username},
263 """Update the username.
265 Raises InvalidUser if user cannot be found.
266 Raises InvalidUsername if the new username is invalid.
271 for user
in self.
usersusers:
273 user[
"username"] = new_username
274 assert self.
_data_data
is not None
278 raise InvalidUser(translation_key=
"user_not_found")
282 if self.
_data_data
is not None:
286 @AUTH_PROVIDERS.register("homeassistant")
288 """Auth provider based on a local storage of users in Home Assistant config dir."""
290 DEFAULT_TITLE =
"Home Assistant Local"
292 def __init__(self, *args: Any, **kwargs: Any) ->
None:
293 """Initialize an Home Assistant auth provider."""
295 self.
datadata: Data |
None =
None
299 """Initialize the auth provider."""
301 if self.
datadata
is not None:
304 data =
Data(self.hass)
305 await data.async_load()
309 """Return a flow to login."""
313 """Validate a username and password."""
314 if self.
datadata
is None:
316 assert self.
datadata
is not None
318 await self.hass.async_add_executor_job(
319 self.
datadata.validate_login, username, password
323 """Call add_auth on data."""
324 if self.
datadata
is None:
326 assert self.
datadata
is not None
328 await self.hass.async_add_executor_job(self.
datadata.add_auth, username, password)
332 """Call remove_auth on data."""
333 if self.
datadata
is None:
335 assert self.
datadata
is not None
341 """Call change_password on data."""
342 if self.
datadata
is None:
344 assert self.
datadata
is not None
346 await self.hass.async_add_executor_job(
347 self.
datadata.change_password, username, new_password
352 self, credential: Credentials, new_username: str
354 """Validate new username and change it including updating credentials object."""
355 if self.
datadata
is None:
357 assert self.
datadata
is not None
359 self.
datadata.change_username(credential.data[
"username"], new_username)
360 self.hass.auth.async_update_user_credentials_data(
361 credential, {**credential.data,
"username": new_username}
366 self, flow_result: Mapping[str, str]
368 """Get credentials based on the flow result."""
369 if self.
datadata
is None:
371 assert self.
datadata
is not None
373 norm_username = self.
datadata.normalize_username
374 username = norm_username(flow_result[
"username"])
376 for credential
in await self.async_credentials():
377 if norm_username(credential.data[
"username"]) == username:
381 return self.async_create_credentials({
"username": username})
384 self, credentials: Credentials
386 """Get extra info for this credential."""
387 return UserMeta(name=credentials.data[
"username"], is_active=
True)
390 """When credentials get removed, also remove the auth."""
391 if self.
datadata
is None:
393 assert self.
datadata
is not None
404 """Handler for the login flow."""
407 self, user_input: dict[str, str] |
None =
None
409 """Handle the step of the form."""
412 if user_input
is not None:
414 await cast(HassAuthProvider, self._auth_provider).async_validate_login(
415 user_input[
"username"], user_input[
"password"]
418 errors[
"base"] =
"invalid_auth"
421 user_input.pop(
"password")
422 return await self.async_finish(user_input)
424 return self.async_show_form(
426 data_schema=vol.Schema(
428 vol.Required(
"username"): str,
429 vol.Required(
"password"): str,
str normalize_username(self, str username, *bool force_normalize=False)
None validate_login(self, str username, str password)
None __init__(self, HomeAssistant hass)
None async_remove_auth(self, str username)
None _async_check_for_not_normalized_usernames(self, dict[str, list[dict[str, str]]] data)
list[dict[str, str]] users(self)
None change_username(self, str username, str new_username)
None add_auth(self, str username, str password)
None _validate_new_username(self, str new_username)
bytes hash_password(self, str password, bool for_storage=False)
None change_password(self, str username, str new_password)
None __init__(self, *Any args, **Any kwargs)
None async_add_auth(self, str username, str password)
None async_initialize(self)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
None async_will_remove_credentials(self, Credentials credentials)
None async_change_password(self, str username, str new_password)
None async_validate_login(self, str username, str password)
None async_remove_auth(self, str username)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
None async_change_username(self, Credentials credential, str new_username)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
None __init__(self, *object args, str|None translation_key=None, dict[str, str]|None translation_placeholders=None)
HassAuthProvider async_get_provider(HomeAssistant hass)
dict[str, Any] _disallow_id(dict[str, Any] conf)
None async_save(self, _T data)