Home Assistant Unofficial Reference 2024.12.1
login_flow.py
Go to the documentation of this file.
1 """HTTP views handle login flow.
2 
3 # GET /auth/providers
4 
5 Return a list of auth providers. Example:
6 
7 [
8  {
9  "name": "Local",
10  "id": null,
11  "type": "local_provider",
12  }
13 ]
14 
15 
16 # POST /auth/login_flow
17 
18 Create a login flow. Will return the first step of the flow.
19 
20 Pass in parameter 'client_id' and 'redirect_url' validate by indieauth.
21 
22 Pass in parameter 'handler' to specify the auth provider to use. Auth providers
23 are identified by type and id.
24 
25 And optional parameter 'type' has to set as 'link_user' if login flow used for
26 link credential to exist user. Default 'type' is 'authorize'.
27 
28 {
29  "client_id": "https://hassbian.local:8123/",
30  "handler": ["local_provider", null],
31  "redirect_url": "https://hassbian.local:8123/",
32  "type': "authorize"
33 }
34 
35 Return value will be a step in a data entry flow. See the docs for data entry
36 flow for details.
37 
38 {
39  "data_schema": [
40  {"name": "username", "type": "string"},
41  {"name": "password", "type": "string"}
42  ],
43  "errors": {},
44  "flow_id": "8f7e42faab604bcab7ac43c44ca34d58",
45  "handler": ["insecure_example", null],
46  "step_id": "init",
47  "type": "form"
48 }
49 
50 
51 # POST /auth/login_flow/{flow_id}
52 
53 Progress the flow. Most flows will be 1 page, but could optionally add extra
54 login challenges, like TFA. Once the flow has finished, the returned step will
55 have type FlowResultType.CREATE_ENTRY and "result" key will contain an authorization code.
56 The authorization code associated with an authorized user by default, it will
57 associate with an credential if "type" set to "link_user" in
58 "/auth/login_flow"
59 
60 {
61  "flow_id": "8f7e42faab604bcab7ac43c44ca34d58",
62  "handler": ["insecure_example", null],
63  "result": "411ee2f916e648d691e937ae9344681e",
64  "title": "Example",
65  "type": "create_entry",
66  "version": 1
67 }
68 """
69 
70 from __future__ import annotations
71 
72 from collections.abc import Callable
73 from http import HTTPStatus
74 from ipaddress import ip_address
75 from typing import TYPE_CHECKING, Any, cast
76 
77 from aiohttp import web
78 import voluptuous as vol
79 import voluptuous_serialize
80 
81 from homeassistant import data_entry_flow
82 from homeassistant.auth import AuthManagerFlowManager, InvalidAuthError
83 from homeassistant.auth.models import AuthFlowContext, AuthFlowResult, Credentials
84 from homeassistant.components import onboarding
85 from homeassistant.components.http import KEY_HASS
86 from homeassistant.components.http.auth import async_user_not_allowed_do_auth
88  log_invalid_auth,
89  process_success_login,
90  process_wrong_login,
91 )
92 from homeassistant.components.http.data_validator import RequestDataValidator
93 from homeassistant.components.http.view import HomeAssistantView
94 from homeassistant.core import HomeAssistant, callback
95 from homeassistant.helpers.network import is_cloud_connection
96 from homeassistant.util.network import is_local
97 
98 from . import indieauth
99 
100 if TYPE_CHECKING:
102  TrustedNetworksAuthProvider,
103  )
104 
105  from . import StoreResultType
106 
107 
108 @callback
110  hass: HomeAssistant, store_result: Callable[[str, Credentials], str]
111 ) -> None:
112  """Component to allow users to login."""
113  hass.http.register_view(WellKnownOAuthInfoView)
114  hass.http.register_view(AuthProvidersView)
115  hass.http.register_view(LoginFlowIndexView(hass.auth.login_flow, store_result))
116  hass.http.register_view(LoginFlowResourceView(hass.auth.login_flow, store_result))
117 
118 
119 class WellKnownOAuthInfoView(HomeAssistantView):
120  """View to host the OAuth2 information."""
121 
122  requires_auth = False
123  url = "/.well-known/oauth-authorization-server"
124  name = "well-known/oauth-authorization-server"
125 
126  async def get(self, request: web.Request) -> web.Response:
127  """Return the well known OAuth2 authorization info."""
128  return self.json(
129  {
130  "authorization_endpoint": "/auth/authorize",
131  "token_endpoint": "/auth/token",
132  "revocation_endpoint": "/auth/revoke",
133  "response_types_supported": ["code"],
134  "service_documentation": (
135  "https://developers.home-assistant.io/docs/auth_api"
136  ),
137  }
138  )
139 
140 
141 class AuthProvidersView(HomeAssistantView):
142  """View to get available auth providers."""
143 
144  url = "/auth/providers"
145  name = "api:auth:providers"
146  requires_auth = False
147 
148  async def get(self, request: web.Request) -> web.Response:
149  """Get available auth providers."""
150  hass = request.app[KEY_HASS]
151  if not onboarding.async_is_user_onboarded(hass):
152  return self.json_message(
153  message="Onboarding not finished",
154  status_code=HTTPStatus.BAD_REQUEST,
155  message_code="onboarding_required",
156  )
157 
158  try:
159  remote_address = ip_address(request.remote) # type: ignore[arg-type]
160  except ValueError:
161  return self.json_message(
162  message="Invalid remote IP",
163  status_code=HTTPStatus.BAD_REQUEST,
164  message_code="invalid_remote_ip",
165  )
166 
167  cloud_connection = is_cloud_connection(hass)
168 
169  providers = []
170  for provider in hass.auth.auth_providers:
171  if provider.type == "trusted_networks":
172  if cloud_connection:
173  # Skip quickly as trusted networks are not available on cloud
174  continue
175 
176  try:
177  cast("TrustedNetworksAuthProvider", provider).async_validate_access(
178  remote_address
179  )
180  except InvalidAuthError:
181  # Not a trusted network, so we don't expose that trusted_network authenticator is setup
182  continue
183 
184  providers.append(
185  {
186  "name": provider.name,
187  "id": provider.id,
188  "type": provider.type,
189  }
190  )
191 
192  preselect_remember_me = not cloud_connection and is_local(remote_address)
193 
194  return self.json(
195  {
196  "providers": providers,
197  "preselect_remember_me": preselect_remember_me,
198  }
199  )
200 
201 
203  result: AuthFlowResult,
204 ) -> AuthFlowResult:
205  """Convert result to JSON."""
206  if result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
207  data = result.copy()
208  data.pop("result")
209  data.pop("data")
210  return data
211 
212  if result["type"] != data_entry_flow.FlowResultType.FORM:
213  return result
214 
215  data = result.copy()
216 
217  if (schema := data["data_schema"]) is None:
218  data["data_schema"] = [] # type: ignore[typeddict-item] # json result type
219  else:
220  data["data_schema"] = voluptuous_serialize.convert(schema)
221 
222  return data
223 
224 
225 class LoginFlowBaseView(HomeAssistantView):
226  """Base class for the login views."""
227 
228  requires_auth = False
229 
230  def __init__(
231  self,
232  flow_mgr: AuthManagerFlowManager,
233  store_result: StoreResultType,
234  ) -> None:
235  """Initialize the flow manager index view."""
236  self._flow_mgr_flow_mgr = flow_mgr
237  self._store_result_store_result = store_result
238 
240  self,
241  request: web.Request,
242  client_id: str,
243  result: AuthFlowResult,
244  ) -> web.Response:
245  """Convert the flow result to a response."""
246  if result["type"] != data_entry_flow.FlowResultType.CREATE_ENTRY:
247  # @log_invalid_auth does not work here since it returns HTTP 200.
248  # We need to manually log failed login attempts.
249  if (
250  result["type"] == data_entry_flow.FlowResultType.FORM
251  and (errors := result.get("errors"))
252  and errors.get("base")
253  in (
254  "invalid_auth",
255  "invalid_code",
256  )
257  ):
258  await process_wrong_login(request)
259  return self.json(_prepare_result_json(result))
260 
261  hass = request.app[KEY_HASS]
262 
263  if not await indieauth.verify_redirect_uri(
264  hass, client_id, result["context"]["redirect_uri"]
265  ):
266  return self.json_message("Invalid redirect URI", HTTPStatus.FORBIDDEN)
267 
268  result.pop("data")
269  result.pop("context")
270 
271  result_obj: Credentials = result.pop("result")
272 
273  # Result can be None if credential was never linked to a user before.
274  user = await hass.auth.async_get_user_by_credentials(result_obj)
275 
276  if user is not None and (
277  user_access_error := async_user_not_allowed_do_auth(hass, user)
278  ):
279  return self.json_message(
280  f"Login blocked: {user_access_error}", HTTPStatus.FORBIDDEN
281  )
282 
283  process_success_login(request)
284  result["result"] = self._store_result_store_result(client_id, result_obj)
285 
286  return self.json(result)
287 
288 
290  """View to create a login flow."""
291 
292  url = "/auth/login_flow"
293  name = "api:auth:login_flow"
294 
295  async def get(self, request: web.Request) -> web.Response:
296  """Do not allow index of flows in progress."""
297  return web.Response(status=HTTPStatus.METHOD_NOT_ALLOWED)
298 
299  @RequestDataValidator( vol.Schema( { vol.Required("client_id"): str,
300  vol.Required("handler"): vol.All(
301  [vol.Any(str, None)], vol.Length(2, 2), vol.Coerce(tuple)
302  ),
303  vol.Required("redirect_uri"): str,
304  vol.Optional("type", default="authorize"): str,
305  }
306  )
307  )
308  @log_invalid_auth
309  async def post(self, request: web.Request, data: dict[str, Any]) -> web.Response:
310  """Create a new login flow."""
311  client_id: str = data["client_id"]
312  redirect_uri: str = data["redirect_uri"]
313 
314  if not indieauth.verify_client_id(client_id):
315  return self.json_message("Invalid client id", HTTPStatus.BAD_REQUEST)
316 
317  handler: tuple[str, str] = tuple(data["handler"])
318 
319  try:
320  result = await self._flow_mgr_flow_mgr.async_init(
321  handler,
322  context=AuthFlowContext(
323  ip_address=ip_address(request.remote), # type: ignore[arg-type]
324  credential_only=data.get("type") == "link_user",
325  redirect_uri=redirect_uri,
326  ),
327  )
329  return self.json_message("Invalid handler specified", HTTPStatus.NOT_FOUND)
331  return self.json_message(
332  "Handler does not support init", HTTPStatus.BAD_REQUEST
333  )
334 
335  return await self._async_flow_result_to_response_async_flow_result_to_response(request, client_id, result)
336 
337 
339  """View to interact with the flow manager."""
340 
341  url = "/auth/login_flow/{flow_id}"
342  name = "api:auth:login_flow:resource"
343 
344  async def get(self, request: web.Request) -> web.Response:
345  """Do not allow getting status of a flow in progress."""
346  return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND)
347 
348  @RequestDataValidator( vol.Schema( {vol.Required("client_id"): str},
349  extra=vol.ALLOW_EXTRA,
350  )
351  )
352  @log_invalid_auth
353  async def post(
354  self, request: web.Request, data: dict[str, Any], flow_id: str
355  ) -> web.Response:
356  """Handle progressing a login flow request."""
357  client_id: str = data.pop("client_id")
358 
359  if not indieauth.verify_client_id(client_id):
360  return self.json_message("Invalid client id", HTTPStatus.BAD_REQUEST)
361 
362  try:
363  # do not allow change ip during login flow
364  flow = self._flow_mgr_flow_mgr.async_get(flow_id)
365  if flow["context"]["ip_address"] != ip_address(request.remote): # type: ignore[arg-type]
366  return self.json_message("IP address changed", HTTPStatus.BAD_REQUEST)
367  result = await self._flow_mgr_flow_mgr.async_configure(flow_id, data)
369  return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND)
370  except vol.Invalid:
371  return self.json_message("User input malformed", HTTPStatus.BAD_REQUEST)
372 
373  return await self._async_flow_result_to_response_async_flow_result_to_response(request, client_id, result)
374 
375  async def delete(self, request: web.Request, flow_id: str) -> web.Response:
376  """Cancel a flow in progress."""
377  try:
378  self._flow_mgr_flow_mgr.async_abort(flow_id)
380  return self.json_message("Invalid flow specified", HTTPStatus.NOT_FOUND)
381 
382  return self.json_message("Flow aborted")
383 
web.Response get(self, web.Request request)
Definition: login_flow.py:148
web.Response _async_flow_result_to_response(self, web.Request request, str client_id, AuthFlowResult result)
Definition: login_flow.py:244
None __init__(self, AuthManagerFlowManager flow_mgr, StoreResultType store_result)
Definition: login_flow.py:234
web.Response post(self, web.Request request, dict[str, Any] data)
Definition: login_flow.py:309
web.Response get(self, web.Request request)
Definition: login_flow.py:295
web.Response post(self, web.Request request, dict[str, Any] data, str flow_id)
Definition: login_flow.py:358
web.Response delete(self, web.Request request, str flow_id)
Definition: login_flow.py:378
None async_setup(HomeAssistant hass, Callable[[str, Credentials], str] store_result)
Definition: login_flow.py:111
AuthFlowResult _prepare_result_json(AuthFlowResult result)
Definition: login_flow.py:204
str|None async_user_not_allowed_do_auth(HomeAssistant hass, User user, Request|None request=None)
Definition: auth.py:91
None process_wrong_login(Request request)
Definition: ban.py:109
None process_success_login(Request request)
Definition: ban.py:173
bool is_local(ConfigEntry entry)
Definition: __init__.py:58
AreaRegistry async_get(HomeAssistant hass)
bool is_cloud_connection(HomeAssistant hass)
Definition: network.py:338