3 from __future__
import annotations
5 from datetime
import datetime, timedelta
6 from ipaddress
import IPv4Address, IPv6Address
8 from typing
import Any, NamedTuple
12 from attr
import Attribute
13 from attr.setters
import validate
14 from propcache
import cached_property
20 from .
import permissions
as perm_mdl
21 from .const
import GROUP_ID_ADMIN
23 TOKEN_TYPE_NORMAL =
"normal"
24 TOKEN_TYPE_SYSTEM =
"system"
25 TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN =
"long_lived_access_token"
29 """Typed context dict for auth flow."""
32 ip_address: IPv4Address | IPv6Address
36 AuthFlowResult = FlowResult[AuthFlowContext, tuple[str, str]]
43 name: str |
None = attr.ib()
44 policy: perm_mdl.PolicyType = attr.ib()
45 id: str = attr.ib(factory=
lambda: uuid.uuid4().hex)
46 system_generated: bool = attr.ib(default=
False)
50 """Handle a change to a permissions."""
51 self.invalidate_cache()
52 return validate(self, user_attr, new)
59 name: str |
None = attr.ib()
60 perm_lookup: perm_mdl.PermissionLookup = attr.ib(eq=
False, order=
False)
61 id: str = attr.ib(factory=
lambda: uuid.uuid4().hex)
62 is_owner: bool = attr.ib(default=
False, on_setattr=_handle_permissions_change)
63 is_active: bool = attr.ib(default=
False, on_setattr=_handle_permissions_change)
64 system_generated: bool = attr.ib(default=
False)
65 local_only: bool = attr.ib(default=
False)
67 groups: list[Group] = attr.ib(
68 factory=list, eq=
False, order=
False, on_setattr=_handle_permissions_change
72 credentials: list[Credentials] = attr.ib(factory=list, eq=
False, order=
False)
75 refresh_tokens: dict[str, RefreshToken] = attr.ib(
76 factory=dict, eq=
False, order=
False
81 """Return permissions object for user."""
83 return perm_mdl.OwnerPermissions
84 return perm_mdl.PolicyPermissions(
85 perm_mdl.merge_policies([group.policy
for group
in self.groups]),
91 """Return if user is part of the admin group."""
92 return self.is_owner
or (
93 self.is_active
and any(gr.id == GROUP_ID_ADMIN
for gr
in self.groups)
97 """Invalidate permission and is_admin cache."""
98 for attr_to_invalidate
in (
"permissions",
"is_admin"):
99 self.__dict__.pop(attr_to_invalidate,
None)
104 """RefreshToken for a user to grant new access tokens."""
106 user: User = attr.ib()
107 client_id: str |
None = attr.ib()
108 access_token_expiration: timedelta = attr.ib()
109 client_name: str |
None = attr.ib(default=
None)
110 client_icon: str |
None = attr.ib(default=
None)
111 token_type: str = attr.ib(
112 default=TOKEN_TYPE_NORMAL,
113 validator=attr.validators.in_(
114 (TOKEN_TYPE_NORMAL, TOKEN_TYPE_SYSTEM, TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)
117 id: str = attr.ib(factory=
lambda: uuid.uuid4().hex)
118 created_at: datetime = attr.ib(factory=dt_util.utcnow)
119 token: str = attr.ib(factory=
lambda: secrets.token_hex(64))
120 jwt_key: str = attr.ib(factory=
lambda: secrets.token_hex(64))
122 last_used_at: datetime |
None = attr.ib(default=
None)
123 last_used_ip: str |
None = attr.ib(default=
None)
125 expire_at: float |
None = attr.ib(default=
None)
127 credential: Credentials |
None = attr.ib(default=
None)
129 version: str |
None = attr.ib(default=__version__)
134 """Credentials for a user on an auth provider."""
136 auth_provider_type: str = attr.ib()
137 auth_provider_id: str |
None = attr.ib()
140 data: dict = attr.ib()
142 id: str = attr.ib(factory=
lambda: uuid.uuid4().hex)
143 is_new: bool = attr.ib(default=
True)
151 group: str |
None =
None
152 local_only: bool |
None =
None
None invalidate_cache(self)
perm_mdl.AbstractPermissions permissions(self)
Any _handle_permissions_change(User self, Attribute user_attr, Any new)
dict[str, Any] validate(SchemaCommonFlowHandler handler, dict[str, Any] user_input)