1 """Storage for auth models."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from logging
import getLogger
18 ACCESS_TOKEN_EXPIRATION,
22 REFRESH_TOKEN_EXPIRATION,
24 from .permissions
import system_policies
25 from .permissions.models
import PermissionLookup
26 from .permissions.types
import PolicyType
30 GROUP_NAME_ADMIN =
"Administrators"
31 GROUP_NAME_USER =
"Users"
32 GROUP_NAME_READ_ONLY =
"Read Only"
41 INITIAL_LOAD_SAVE_DELAY = 300
43 DEFAULT_SAVE_DELAY = 1
47 """Stores authentication info.
49 Any mutation to an object should happen inside the auth store.
51 The auth store is lazy. It won't load the data from disk until a method is
55 def __init__(self, hass: HomeAssistant) ->
None:
56 """Initialize the auth store."""
62 self.
_store_store = Store[dict[str, list[dict[str, Any]]]](
63 hass, STORAGE_VERSION, STORAGE_KEY, private=
True, atomic_writes=
True
68 """Retrieve all users."""
72 """Retrieve all users."""
76 """Retrieve all users."""
80 """Retrieve a user by id."""
86 is_owner: bool |
None =
None,
87 is_active: bool |
None =
None,
88 system_generated: bool |
None =
None,
90 group_ids: list[str] |
None =
None,
91 local_only: bool |
None =
None,
93 """Create a new user."""
95 for group_id
in group_ids
or []:
96 if (group := self.
_groups_groups.
get(group_id))
is None:
97 raise ValueError(f
"Invalid group specified {group_id}")
100 kwargs: dict[str, Any] = {
111 for attr_name, value
in (
112 (
"is_owner", is_owner),
113 (
"is_active", is_active),
114 (
"local_only", local_only),
115 (
"system_generated", system_generated),
123 self.
_users_users[new_user.id] = new_user
125 if credentials
is None:
136 """Add credentials to an existing user."""
137 user.credentials.append(credentials)
139 credentials.is_new =
False
143 user = self.
_users_users.pop(user.id)
144 for refresh_token_id
in user.refresh_tokens:
146 user.refresh_tokens.clear()
152 name: str |
None =
None,
153 is_active: bool |
None =
None,
154 group_ids: list[str] |
None =
None,
155 local_only: bool |
None =
None,
158 if group_ids
is not None:
160 for grid
in group_ids:
161 if (group := self.
_groups_groups.
get(grid))
is None:
162 raise ValueError(
"Invalid group specified.")
167 for attr_name, value
in (
169 (
"is_active", is_active),
170 (
"local_only", local_only),
172 if value
is not None:
173 setattr(user, attr_name, value)
178 """Activate a user."""
179 user.is_active =
True
183 """Activate a user."""
184 user.is_active =
False
188 """Remove credentials."""
189 for user
in self.
_users_users.values():
192 for index, cred
in enumerate(user.credentials):
193 if cred
is credentials:
197 if found
is not None:
198 user.credentials.pop(found)
206 client_id: str |
None =
None,
207 client_name: str |
None =
None,
208 client_icon: str |
None =
None,
209 token_type: str = models.TOKEN_TYPE_NORMAL,
210 access_token_expiration: timedelta = ACCESS_TOKEN_EXPIRATION,
211 expire_at: float |
None =
None,
214 """Create a new token for a user."""
215 kwargs: dict[str, Any] = {
217 "client_id": client_id,
218 "token_type": token_type,
219 "access_token_expiration": access_token_expiration,
220 "expire_at": expire_at,
221 "credential": credential,
224 kwargs[
"client_name"] = client_name
226 kwargs[
"client_icon"] = client_icon
229 token_id = refresh_token.id
230 user.refresh_tokens[token_id] = refresh_token
238 """Remove a refresh token."""
239 refresh_token_id = refresh_token.id
241 del self.
_users_users[user_id].refresh_tokens[refresh_token_id]
247 """Get refresh token by id."""
249 return self.
_users_users[user_id].refresh_tokens.get(token_id)
256 """Get refresh token by token."""
259 for user
in self.
_users_users.values():
260 for refresh_token
in user.refresh_tokens.values():
261 if hmac.compare_digest(refresh_token.token, token):
262 found = refresh_token
268 """Get all refresh tokens."""
270 itertools.chain.from_iterable(
271 user.refresh_tokens.values()
for user
in self.
_users_users.values()
277 self, refresh_token: models.RefreshToken, remote_ip: str |
None =
None
279 """Update refresh token last used information."""
280 refresh_token.last_used_at = dt_util.utcnow()
281 refresh_token.last_used_ip = remote_ip
282 if refresh_token.expire_at:
283 refresh_token.expire_at = (
284 refresh_token.last_used_at.timestamp() + REFRESH_TOKEN_EXPIRATION
290 self, refresh_token: models.RefreshToken, *, enable_expiry: bool
292 """Enable or disable expiry of a refresh token."""
294 if refresh_token.expire_at
is None:
295 refresh_token.expire_at = (
296 refresh_token.last_used_at
or dt_util.utcnow()
297 ).timestamp() + REFRESH_TOKEN_EXPIRATION
300 refresh_token.expire_at =
None
305 self, credentials: models.Credentials, data: dict[str, Any]
307 """Update credentials data."""
308 credentials.data = data
312 """Load the users."""
314 raise RuntimeError(
"Auth storage is already loaded")
317 dev_reg = dr.async_get(self.
hasshass)
318 ent_reg = er.async_get(self.
hasshass)
324 if data
is None or not isinstance(data, dict):
337 has_admin_group =
False
338 has_user_group =
False
339 has_read_only_group =
False
340 group_without_policy =
None
346 for group_dict
in data.get(
"groups", []):
347 policy: PolicyType |
None =
None
349 if group_dict[
"id"] == GROUP_ID_ADMIN:
350 has_admin_group =
True
352 name = GROUP_NAME_ADMIN
353 policy = system_policies.ADMIN_POLICY
354 system_generated =
True
356 elif group_dict[
"id"] == GROUP_ID_USER:
357 has_user_group =
True
359 name = GROUP_NAME_USER
360 policy = system_policies.USER_POLICY
361 system_generated =
True
363 elif group_dict[
"id"] == GROUP_ID_READ_ONLY:
364 has_read_only_group =
True
366 name = GROUP_NAME_READ_ONLY
367 policy = system_policies.READ_ONLY_POLICY
368 system_generated =
True
371 name = group_dict[
"name"]
372 policy = group_dict.get(
"policy")
373 system_generated =
False
378 group_without_policy = group_dict[
"id"]
385 system_generated=system_generated,
390 migrate_users_to_admin_group =
not groups
and group_without_policy
is None
396 if groups
and group_without_policy
is not None:
397 group_without_policy =
None
400 if not has_admin_group:
402 groups[admin_group.id] = admin_group
405 if not has_read_only_group:
407 groups[read_only_group.id] = read_only_group
409 if not has_user_group:
411 groups[user_group.id] = user_group
413 for user_dict
in data[
"users"]:
416 for group_id
in user_dict.get(
"group_ids", []):
418 if group_id == group_without_policy:
419 group_id = GROUP_ID_ADMIN
420 user_groups.append(groups[group_id])
423 if not user_dict[
"system_generated"]
and migrate_users_to_admin_group:
424 user_groups.append(groups[GROUP_ID_ADMIN])
427 name=user_dict[
"name"],
430 is_owner=user_dict[
"is_owner"],
431 is_active=user_dict[
"is_active"],
432 system_generated=user_dict[
"system_generated"],
433 perm_lookup=perm_lookup,
435 local_only=user_dict.get(
"local_only",
False),
438 for cred_dict
in data[
"credentials"]:
442 auth_provider_type=cred_dict[
"auth_provider_type"],
443 auth_provider_id=cred_dict[
"auth_provider_id"],
444 data=cred_dict[
"data"],
446 credentials[cred_dict[
"id"]] = credential
447 users[cred_dict[
"user_id"]].credentials.append(credential)
449 for rt_dict
in data[
"refresh_tokens"]:
451 if "jwt_key" not in rt_dict:
454 created_at = dt_util.parse_datetime(rt_dict[
"created_at"])
455 if created_at
is None:
456 getLogger(__name__).error(
458 "Ignoring refresh token %(id)s with invalid created_at "
459 "%(created_at)s for user_id %(user_id)s"
465 if (token_type := rt_dict.get(
"token_type"))
is None:
466 if rt_dict[
"client_id"]
is None:
467 token_type = models.TOKEN_TYPE_SYSTEM
469 token_type = models.TOKEN_TYPE_NORMAL
472 if last_used_at_str := rt_dict.get(
"last_used_at"):
473 last_used_at = dt_util.parse_datetime(last_used_at_str)
479 user=users[rt_dict[
"user_id"]],
480 client_id=rt_dict[
"client_id"],
482 client_name=rt_dict.get(
"client_name"),
483 client_icon=rt_dict.get(
"client_icon"),
484 token_type=token_type,
485 created_at=created_at,
487 seconds=rt_dict[
"access_token_expiration"]
489 token=rt_dict[
"token"],
490 jwt_key=rt_dict[
"jwt_key"],
491 last_used_at=last_used_at,
492 last_used_ip=rt_dict.get(
"last_used_ip"),
493 expire_at=rt_dict.get(
"expire_at"),
494 version=rt_dict.get(
"version"),
496 if "credential_id" in rt_dict:
497 token.credential = credentials.get(rt_dict[
"credential_id"])
498 users[rt_dict[
"user_id"]].refresh_tokens[token.id] = token
507 """Build a map of token id to user id."""
510 for user_id, user
in self.
_users_users.items()
511 for token_id
in user.refresh_tokens
521 """Return the data to store."""
525 "group_ids": [group.id
for group
in user.groups],
526 "is_owner": user.is_owner,
527 "is_active": user.is_active,
529 "system_generated": user.system_generated,
530 "local_only": user.local_only,
532 for user
in self.
_users_users.values()
536 for group
in self.
_groups_groups.values():
537 g_dict: dict[str, Any] = {
543 if not group.system_generated:
544 g_dict[
"policy"] = group.policy
546 groups.append(g_dict)
552 "auth_provider_type": credential.auth_provider_type,
553 "auth_provider_id": credential.auth_provider_id,
554 "data": credential.data,
556 for user
in self.
_users_users.values()
557 for credential
in user.credentials
562 "id": refresh_token.id,
564 "client_id": refresh_token.client_id,
565 "client_name": refresh_token.client_name,
566 "client_icon": refresh_token.client_icon,
567 "token_type": refresh_token.token_type,
568 "created_at": refresh_token.created_at.isoformat(),
569 "access_token_expiration": (
570 refresh_token.access_token_expiration.total_seconds()
572 "token": refresh_token.token,
573 "jwt_key": refresh_token.jwt_key,
574 "last_used_at": refresh_token.last_used_at.isoformat()
575 if refresh_token.last_used_at
577 "last_used_ip": refresh_token.last_used_ip,
578 "expire_at": refresh_token.expire_at,
579 "credential_id": refresh_token.credential.id
580 if refresh_token.credential
582 "version": refresh_token.version,
584 for user
in self.
_users_users.values()
585 for refresh_token
in user.refresh_tokens.values()
591 "credentials": credentials,
592 "refresh_tokens": refresh_tokens,
596 """Set default values for auth store."""
601 groups[admin_group.id] = admin_group
603 groups[user_group.id] = user_group
605 groups[read_only_group.id] = read_only_group
611 """Create system admin group."""
613 name=GROUP_NAME_ADMIN,
615 policy=system_policies.ADMIN_POLICY,
616 system_generated=
True,
621 """Create system user group."""
623 name=GROUP_NAME_USER,
625 policy=system_policies.USER_POLICY,
626 system_generated=
True,
631 """Create read only group."""
633 name=GROUP_NAME_READ_ONLY,
634 id=GROUP_ID_READ_ONLY,
635 policy=system_policies.READ_ONLY_POLICY,
636 system_generated=
True,
models.Group|None async_get_group(self, str group_id)
models.RefreshToken|None async_get_refresh_token(self, str token_id)
None async_remove_credentials(self, models.Credentials credentials)
models.User async_create_user(self, str|None name, bool|None is_owner=None, bool|None is_active=None, bool|None system_generated=None, models.Credentials|None credentials=None, list[str]|None group_ids=None, bool|None local_only=None)
list[models.User] async_get_users(self)
None async_remove_user(self, models.User user)
None async_update_user_credentials_data(self, models.Credentials credentials, dict[str, Any] data)
models.User|None async_get_user(self, str user_id)
None async_activate_user(self, models.User user)
None async_log_refresh_token_usage(self, models.RefreshToken refresh_token, str|None remote_ip=None)
list[models.Group] async_get_groups(self)
models.RefreshToken|None async_get_refresh_token_by_token(self, str token)
None _async_schedule_save(self, float delay=DEFAULT_SAVE_DELAY)
None _build_token_id_to_user_id(self)
list[models.RefreshToken] async_get_refresh_tokens(self)
None async_set_expiry(self, models.RefreshToken refresh_token, *bool enable_expiry)
None async_remove_refresh_token(self, models.RefreshToken refresh_token)
None async_deactivate_user(self, models.User user)
None async_update_user(self, models.User user, str|None name=None, bool|None is_active=None, list[str]|None group_ids=None, bool|None local_only=None)
None async_link_user(self, models.User user, models.Credentials credentials)
dict[str, list[dict[str, Any]]] _data_to_save(self)
models.RefreshToken async_create_refresh_token(self, models.User user, str|None client_id=None, str|None client_name=None, str|None client_icon=None, str token_type=models.TOKEN_TYPE_NORMAL, timedelta access_token_expiration=ACCESS_TOKEN_EXPIRATION, float|None expire_at=None, models.Credentials|None credential=None)
None __init__(self, HomeAssistant hass)
models.Group _system_admin_group()
models.Group _system_read_only_group()
models.Group _system_user_group()
web.Response get(self, web.Request request, str config_key)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)