1 """Trusted Networks auth provider.
3 It shows list of users if access from trusted network.
4 Abort login flow if not access from trusted network.
7 from __future__
import annotations
9 from collections.abc
import Mapping
10 from ipaddress
import (
18 from typing
import Any, cast
20 import voluptuous
as vol
27 from ..
import InvalidAuthError
28 from ..models
import (
35 from .
import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
37 type IPAddress = IPv4Address | IPv6Address
38 type IPNetwork = IPv4Network | IPv6Network
40 CONF_TRUSTED_NETWORKS =
"trusted_networks"
41 CONF_TRUSTED_USERS =
"trusted_users"
43 CONF_ALLOW_BYPASS_LOGIN =
"allow_bypass_login"
45 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
47 vol.Required(CONF_TRUSTED_NETWORKS): vol.All(cv.ensure_list, [ip_network]),
48 vol.Optional(CONF_TRUSTED_USERS, default={}): vol.Schema(
56 vol.Schema({vol.Required(CONF_GROUP): str}),
62 vol.Optional(CONF_ALLOW_BYPASS_LOGIN, default=
False): cv.boolean,
64 extra=vol.PREVENT_EXTRA,
69 """Raised when try to login as invalid user."""
72 @AUTH_PROVIDERS.register(
"trusted_networks")
74 """Trusted Networks auth provider.
76 Allow passwordless access from trusted network.
79 DEFAULT_TITLE =
"Trusted Networks"
83 """Return trusted networks."""
84 return cast(list[IPNetwork], self.config[CONF_TRUSTED_NETWORKS])
88 """Return trusted users per network."""
89 return cast(dict[IPNetwork, Any], self.config[CONF_TRUSTED_USERS])
93 """Return trusted proxies in the system."""
94 if not self.hass.http:
98 ip_network(trusted_proxy)
99 for trusted_proxy
in self.hass.http.trusted_proxies
104 """Trusted Networks auth provider does not support MFA."""
108 """Return a flow to login."""
109 assert context
is not None
110 ip_addr = cast(IPAddress, context.get(
"ip_address"))
113 user
for user
in users
if not user.system_generated
and user.is_active
115 for ip_net, user_or_group_list
in self.
trusted_userstrusted_users.items():
116 if ip_addr
not in ip_net:
120 user_id
for user_id
in user_or_group_list
if isinstance(user_id, str)
124 for group
in user_or_group_list
125 if isinstance(group, dict)
127 flattened_group_list = [
128 group
for sublist
in group_list
for group
in sublist
132 for user
in available_users
135 or any(group.id
in flattened_group_list
for group
in user.groups)
143 {user.id: user.name
for user
in available_users},
144 self.config[CONF_ALLOW_BYPASS_LOGIN],
148 self, flow_result: Mapping[str, str]
150 """Get credentials based on the flow result."""
151 user_id = flow_result[
"user"]
155 if user.id != user_id:
158 if user.system_generated:
161 if not user.is_active:
164 for credential
in await self.async_credentials():
165 if credential.data[
"user_id"] == user_id:
168 cred = self.async_create_credentials({
"user_id": user_id})
169 await self.store.async_link_user(user, cred)
173 raise InvalidUserError
176 self, credentials: Credentials
178 """Return extra user metadata for credentials.
180 Trusted network auth provider should never create new user.
182 raise NotImplementedError
186 """Make sure the access from trusted networks.
188 Raise InvalidAuthError if not.
189 Raise InvalidAuthError if trusted_networks is not configured.
192 raise InvalidAuthError(
"trusted_networks is not configured")
195 ip_addr
in trusted_network
for trusted_network
in self.
trusted_networkstrusted_networks
197 raise InvalidAuthError(
"Not in trusted_networks")
199 if any(ip_addr
in trusted_proxy
for trusted_proxy
in self.
trusted_proxiestrusted_proxies):
200 raise InvalidAuthError(
"Can't allow access from a proxy server")
203 raise InvalidAuthError(
"Can't allow access from Home Assistant Cloud")
207 self, refresh_token: RefreshToken, remote_ip: str |
None =
None
209 """Verify a refresh token is still valid."""
210 if remote_ip
is None:
211 raise InvalidAuthError(
212 "Unknown remote ip can't be used for trusted network provider."
218 """Handler for the login flow."""
222 auth_provider: TrustedNetworksAuthProvider,
224 available_users: dict[str, str |
None],
225 allow_bypass_login: bool,
227 """Initialize the login flow."""
234 self, user_input: dict[str, str] |
None =
None
236 """Handle the step of the form."""
239 TrustedNetworksAuthProvider, self._auth_provider
240 ).async_validate_access(self.
_ip_address_ip_address)
242 except InvalidAuthError:
243 return self.async_abort(reason=
"not_allowed")
245 if user_input
is not None:
246 return await self.async_finish(user_input)
249 return await self.async_finish(
253 return self.async_show_form(
255 data_schema=vol.Schema(
dict[IPNetwork, Any] trusted_users(self)
list[IPNetwork] trusted_proxies(self)
None async_validate_access(self, IPAddress ip_addr)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
None async_validate_refresh_token(self, RefreshToken refresh_token, str|None remote_ip=None)
list[IPNetwork] trusted_networks(self)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
None __init__(self, TrustedNetworksAuthProvider auth_provider, IPAddress ip_addr, dict[str, str|None] available_users, bool allow_bypass_login)
list[str] async_get_users(HomeAssistant hass)
bool is_cloud_connection(HomeAssistant hass)