Home Assistant Unofficial Reference 2024.12.1
trusted_networks.py
Go to the documentation of this file.
1 """Trusted Networks auth provider.
2 
3 It shows list of users if access from trusted network.
4 Abort login flow if not access from trusted network.
5 """
6 
7 from __future__ import annotations
8 
9 from collections.abc import Mapping
10 from ipaddress import (
11  IPv4Address,
12  IPv4Network,
13  IPv6Address,
14  IPv6Network,
15  ip_address,
16  ip_network,
17 )
18 from typing import Any, cast
19 
20 import voluptuous as vol
21 
22 from homeassistant.core import callback
23 from homeassistant.exceptions import HomeAssistantError
25 from homeassistant.helpers.network import is_cloud_connection
26 
27 from .. import InvalidAuthError
28 from ..models import (
29  AuthFlowContext,
30  AuthFlowResult,
31  Credentials,
32  RefreshToken,
33  UserMeta,
34 )
35 from . import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
36 
37 type IPAddress = IPv4Address | IPv6Address
38 type IPNetwork = IPv4Network | IPv6Network
39 
40 CONF_TRUSTED_NETWORKS = "trusted_networks"
41 CONF_TRUSTED_USERS = "trusted_users"
42 CONF_GROUP = "group"
43 CONF_ALLOW_BYPASS_LOGIN = "allow_bypass_login"
44 
45 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
46  {
47  vol.Required(CONF_TRUSTED_NETWORKS): vol.All(cv.ensure_list, [ip_network]),
48  vol.Optional(CONF_TRUSTED_USERS, default={}): vol.Schema(
49  # we only validate the format of user_id or group_id
50  {
51  ip_network: vol.All(
52  cv.ensure_list,
53  [
54  vol.Or(
55  cv.uuid4_hex,
56  vol.Schema({vol.Required(CONF_GROUP): str}),
57  )
58  ],
59  )
60  }
61  ),
62  vol.Optional(CONF_ALLOW_BYPASS_LOGIN, default=False): cv.boolean,
63  },
64  extra=vol.PREVENT_EXTRA,
65 )
66 
67 
69  """Raised when try to login as invalid user."""
70 
71 
72 @AUTH_PROVIDERS.register("trusted_networks")
73 class TrustedNetworksAuthProvider(AuthProvider):
74  """Trusted Networks auth provider.
75 
76  Allow passwordless access from trusted network.
77  """
78 
79  DEFAULT_TITLE = "Trusted Networks"
80 
81  @property
82  def trusted_networks(self) -> list[IPNetwork]:
83  """Return trusted networks."""
84  return cast(list[IPNetwork], self.config[CONF_TRUSTED_NETWORKS])
85 
86  @property
87  def trusted_users(self) -> dict[IPNetwork, Any]:
88  """Return trusted users per network."""
89  return cast(dict[IPNetwork, Any], self.config[CONF_TRUSTED_USERS])
90 
91  @property
92  def trusted_proxies(self) -> list[IPNetwork]:
93  """Return trusted proxies in the system."""
94  if not self.hass.http:
95  return []
96 
97  return [
98  ip_network(trusted_proxy)
99  for trusted_proxy in self.hass.http.trusted_proxies
100  ]
101 
102  @property
103  def support_mfa(self) -> bool:
104  """Trusted Networks auth provider does not support MFA."""
105  return False
106 
107  async def async_login_flow(self, context: AuthFlowContext | None) -> LoginFlow:
108  """Return a flow to login."""
109  assert context is not None
110  ip_addr = cast(IPAddress, context.get("ip_address"))
111  users = await self.store.async_get_users()
112  available_users = [
113  user for user in users if not user.system_generated and user.is_active
114  ]
115  for ip_net, user_or_group_list in self.trusted_userstrusted_users.items():
116  if ip_addr not in ip_net:
117  continue
118 
119  user_list = [
120  user_id for user_id in user_or_group_list if isinstance(user_id, str)
121  ]
122  group_list = [
123  group[CONF_GROUP]
124  for group in user_or_group_list
125  if isinstance(group, dict)
126  ]
127  flattened_group_list = [
128  group for sublist in group_list for group in sublist
129  ]
130  available_users = [
131  user
132  for user in available_users
133  if (
134  user.id in user_list
135  or any(group.id in flattened_group_list for group in user.groups)
136  )
137  ]
138  break
139 
141  self,
142  ip_addr,
143  {user.id: user.name for user in available_users},
144  self.config[CONF_ALLOW_BYPASS_LOGIN],
145  )
146 
148  self, flow_result: Mapping[str, str]
149  ) -> Credentials:
150  """Get credentials based on the flow result."""
151  user_id = flow_result["user"]
152 
153  users = await self.store.async_get_users()
154  for user in users:
155  if user.id != user_id:
156  continue
157 
158  if user.system_generated:
159  continue
160 
161  if not user.is_active:
162  continue
163 
164  for credential in await self.async_credentials():
165  if credential.data["user_id"] == user_id:
166  return credential
167 
168  cred = self.async_create_credentials({"user_id": user_id})
169  await self.store.async_link_user(user, cred)
170  return cred
171 
172  # We only allow login as exist user
173  raise InvalidUserError
174 
176  self, credentials: Credentials
177  ) -> UserMeta:
178  """Return extra user metadata for credentials.
179 
180  Trusted network auth provider should never create new user.
181  """
182  raise NotImplementedError
183 
184  @callback
185  def async_validate_access(self, ip_addr: IPAddress) -> None:
186  """Make sure the access from trusted networks.
187 
188  Raise InvalidAuthError if not.
189  Raise InvalidAuthError if trusted_networks is not configured.
190  """
191  if not self.trusted_networkstrusted_networks:
192  raise InvalidAuthError("trusted_networks is not configured")
193 
194  if not any(
195  ip_addr in trusted_network for trusted_network in self.trusted_networkstrusted_networks
196  ):
197  raise InvalidAuthError("Not in trusted_networks")
198 
199  if any(ip_addr in trusted_proxy for trusted_proxy in self.trusted_proxiestrusted_proxies):
200  raise InvalidAuthError("Can't allow access from a proxy server")
201 
202  if is_cloud_connection(self.hass):
203  raise InvalidAuthError("Can't allow access from Home Assistant Cloud")
204 
205  @callback
207  self, refresh_token: RefreshToken, remote_ip: str | None = None
208  ) -> None:
209  """Verify a refresh token is still valid."""
210  if remote_ip is None:
211  raise InvalidAuthError(
212  "Unknown remote ip can't be used for trusted network provider."
213  )
214  self.async_validate_accessasync_validate_access(ip_address(remote_ip))
215 
216 
217 class TrustedNetworksLoginFlow(LoginFlow):
218  """Handler for the login flow."""
219 
220  def __init__(
221  self,
222  auth_provider: TrustedNetworksAuthProvider,
223  ip_addr: IPAddress,
224  available_users: dict[str, str | None],
225  allow_bypass_login: bool,
226  ) -> None:
227  """Initialize the login flow."""
228  super().__init__(auth_provider)
229  self._available_users_available_users = available_users
230  self._ip_address_ip_address = ip_addr
231  self._allow_bypass_login_allow_bypass_login = allow_bypass_login
232 
233  async def async_step_init(
234  self, user_input: dict[str, str] | None = None
235  ) -> AuthFlowResult:
236  """Handle the step of the form."""
237  try:
238  cast(
239  TrustedNetworksAuthProvider, self._auth_provider
240  ).async_validate_access(self._ip_address_ip_address)
241 
242  except InvalidAuthError:
243  return self.async_abort(reason="not_allowed")
244 
245  if user_input is not None:
246  return await self.async_finish(user_input)
247 
248  if self._allow_bypass_login_allow_bypass_login and len(self._available_users_available_users) == 1:
249  return await self.async_finish(
250  {"user": next(iter(self._available_users_available_users.keys()))}
251  )
252 
253  return self.async_show_form(
254  step_id="init",
255  data_schema=vol.Schema(
256  {vol.Required("user"): vol.In(self._available_users_available_users)}
257  ),
258  )
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
None async_validate_refresh_token(self, RefreshToken refresh_token, str|None remote_ip=None)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
None __init__(self, TrustedNetworksAuthProvider auth_provider, IPAddress ip_addr, dict[str, str|None] available_users, bool allow_bypass_login)
list[str] async_get_users(HomeAssistant hass)
Definition: http.py:394
bool is_cloud_connection(HomeAssistant hass)
Definition: network.py:338