1 """HTTP views handle login flow.
5 Return a list of auth providers. Example:
11 "type": "local_provider",
16 # POST /auth/login_flow
18 Create a login flow. Will return the first step of the flow.
20 Pass in parameter 'client_id' and 'redirect_url' validate by indieauth.
22 Pass in parameter 'handler' to specify the auth provider to use. Auth providers
23 are identified by type and id.
25 And optional parameter 'type' has to set as 'link_user' if login flow used for
26 link credential to exist user. Default 'type' is 'authorize'.
29 "client_id": "https://hassbian.local:8123/",
30 "handler": ["local_provider", null],
31 "redirect_url": "https://hassbian.local:8123/",
35 Return value will be a step in a data entry flow. See the docs for data entry
40 {"name": "username", "type": "string"},
41 {"name": "password", "type": "string"}
44 "flow_id": "8f7e42faab604bcab7ac43c44ca34d58",
45 "handler": ["insecure_example", null],
51 # POST /auth/login_flow/{flow_id}
53 Progress the flow. Most flows will be 1 page, but could optionally add extra
54 login challenges, like TFA. Once the flow has finished, the returned step will
55 have type FlowResultType.CREATE_ENTRY and "result" key will contain an authorization code.
56 The authorization code associated with an authorized user by default, it will
57 associate with an credential if "type" set to "link_user" in
61 "flow_id": "8f7e42faab604bcab7ac43c44ca34d58",
62 "handler": ["insecure_example", null],
63 "result": "411ee2f916e648d691e937ae9344681e",
65 "type": "create_entry",
70 from __future__
import annotations
72 from collections.abc
import Callable
73 from http
import HTTPStatus
74 from ipaddress
import ip_address
75 from typing
import TYPE_CHECKING, Any, cast
77 from aiohttp
import web
78 import voluptuous
as vol
79 import voluptuous_serialize
81 from homeassistant
import data_entry_flow
89 process_success_login,
98 from .
import indieauth
102 TrustedNetworksAuthProvider,
105 from .
import StoreResultType
110 hass: HomeAssistant, store_result: Callable[[str, Credentials], str]
112 """Component to allow users to login."""
113 hass.http.register_view(WellKnownOAuthInfoView)
114 hass.http.register_view(AuthProvidersView)
120 """View to host the OAuth2 information."""
122 requires_auth =
False
123 url =
"/.well-known/oauth-authorization-server"
124 name =
"well-known/oauth-authorization-server"
126 async
def get(self, request: web.Request) -> web.Response:
127 """Return the well known OAuth2 authorization info."""
130 "authorization_endpoint":
"/auth/authorize",
131 "token_endpoint":
"/auth/token",
132 "revocation_endpoint":
"/auth/revoke",
133 "response_types_supported": [
"code"],
134 "service_documentation": (
135 "https://developers.home-assistant.io/docs/auth_api"
142 """View to get available auth providers."""
144 url =
"/auth/providers"
145 name =
"api:auth:providers"
146 requires_auth =
False
148 async
def get(self, request: web.Request) -> web.Response:
149 """Get available auth providers."""
150 hass = request.app[KEY_HASS]
151 if not onboarding.async_is_user_onboarded(hass):
152 return self.json_message(
153 message=
"Onboarding not finished",
154 status_code=HTTPStatus.BAD_REQUEST,
155 message_code=
"onboarding_required",
159 remote_address = ip_address(request.remote)
161 return self.json_message(
162 message=
"Invalid remote IP",
163 status_code=HTTPStatus.BAD_REQUEST,
164 message_code=
"invalid_remote_ip",
170 for provider
in hass.auth.auth_providers:
171 if provider.type ==
"trusted_networks":
177 cast(
"TrustedNetworksAuthProvider", provider).async_validate_access(
180 except InvalidAuthError:
186 "name": provider.name,
188 "type": provider.type,
192 preselect_remember_me =
not cloud_connection
and is_local(remote_address)
196 "providers": providers,
197 "preselect_remember_me": preselect_remember_me,
203 result: AuthFlowResult,
205 """Convert result to JSON."""
206 if result[
"type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
212 if result[
"type"] != data_entry_flow.FlowResultType.FORM:
217 if (schema := data[
"data_schema"])
is None:
218 data[
"data_schema"] = []
220 data[
"data_schema"] = voluptuous_serialize.convert(schema)
226 """Base class for the login views."""
228 requires_auth =
False
232 flow_mgr: AuthManagerFlowManager,
233 store_result: StoreResultType,
235 """Initialize the flow manager index view."""
241 request: web.Request,
243 result: AuthFlowResult,
245 """Convert the flow result to a response."""
246 if result[
"type"] != data_entry_flow.FlowResultType.CREATE_ENTRY:
250 result[
"type"] == data_entry_flow.FlowResultType.FORM
251 and (errors := result.get(
"errors"))
252 and errors.get(
"base")
261 hass = request.app[KEY_HASS]
263 if not await indieauth.verify_redirect_uri(
264 hass, client_id, result[
"context"][
"redirect_uri"]
266 return self.json_message(
"Invalid redirect URI", HTTPStatus.FORBIDDEN)
269 result.pop(
"context")
271 result_obj: Credentials = result.pop(
"result")
274 user = await hass.auth.async_get_user_by_credentials(result_obj)
276 if user
is not None and (
279 return self.json_message(
280 f
"Login blocked: {user_access_error}", HTTPStatus.FORBIDDEN
284 result[
"result"] = self.
_store_result_store_result(client_id, result_obj)
286 return self.json(result)
290 """View to create a login flow."""
292 url =
"/auth/login_flow"
293 name =
"api:auth:login_flow"
295 async
def get(self, request: web.Request) -> web.Response:
296 """Do not allow index of flows in progress."""
297 return web.Response(status=HTTPStatus.METHOD_NOT_ALLOWED)
299 @RequestDataValidator(
vol.Schema(
{
vol.Required("client_id"): str,
300 vol.Required(
"handler"): vol.All(
301 [vol.Any(str,
None)], vol.Length(2, 2), vol.Coerce(tuple)
303 vol.Required(
"redirect_uri"): str,
304 vol.Optional(
"type", default=
"authorize"): str,
309 async
def post(self, request: web.Request, data: dict[str, Any]) -> web.Response:
310 """Create a new login flow."""
311 client_id: str = data[
"client_id"]
312 redirect_uri: str = data[
"redirect_uri"]
314 if not indieauth.verify_client_id(client_id):
315 return self.json_message(
"Invalid client id", HTTPStatus.BAD_REQUEST)
317 handler: tuple[str, str] =
tuple(data[
"handler"])
320 result = await self.
_flow_mgr_flow_mgr.async_init(
323 ip_address=ip_address(request.remote),
324 credential_only=data.get(
"type") ==
"link_user",
325 redirect_uri=redirect_uri,
329 return self.json_message(
"Invalid handler specified", HTTPStatus.NOT_FOUND)
331 return self.json_message(
332 "Handler does not support init", HTTPStatus.BAD_REQUEST
339 """View to interact with the flow manager."""
341 url =
"/auth/login_flow/{flow_id}"
342 name =
"api:auth:login_flow:resource"
344 async
def get(self, request: web.Request) -> web.Response:
345 """Do not allow getting status of a flow in progress."""
346 return self.json_message(
"Invalid flow specified", HTTPStatus.NOT_FOUND)
348 @RequestDataValidator(
vol.Schema(
{vol.Required("client_id"): str},
349 extra=vol.ALLOW_EXTRA,
354 self, request: web.Request, data: dict[str, Any], flow_id: str
356 """Handle progressing a login flow request."""
357 client_id: str = data.pop(
"client_id")
359 if not indieauth.verify_client_id(client_id):
360 return self.json_message(
"Invalid client id", HTTPStatus.BAD_REQUEST)
365 if flow[
"context"][
"ip_address"] != ip_address(request.remote):
366 return self.json_message(
"IP address changed", HTTPStatus.BAD_REQUEST)
367 result = await self.
_flow_mgr_flow_mgr.async_configure(flow_id, data)
369 return self.json_message(
"Invalid flow specified", HTTPStatus.NOT_FOUND)
371 return self.json_message(
"User input malformed", HTTPStatus.BAD_REQUEST)
375 async
def delete(self, request: web.Request, flow_id: str) -> web.Response:
376 """Cancel a flow in progress."""
380 return self.json_message(
"Invalid flow specified", HTTPStatus.NOT_FOUND)
382 return self.json_message(
"Flow aborted")
383
web.Response get(self, web.Request request)
web.Response _async_flow_result_to_response(self, web.Request request, str client_id, AuthFlowResult result)
None __init__(self, AuthManagerFlowManager flow_mgr, StoreResultType store_result)
web.Response post(self, web.Request request, dict[str, Any] data)
web.Response get(self, web.Request request)
web.Response post(self, web.Request request, dict[str, Any] data, str flow_id)
web.Response delete(self, web.Request request, str flow_id)
web.Response get(self, web.Request request)
web.Response get(self, web.Request request)
None async_setup(HomeAssistant hass, Callable[[str, Credentials], str] store_result)
AuthFlowResult _prepare_result_json(AuthFlowResult result)
str|None async_user_not_allowed_do_auth(HomeAssistant hass, User user, Request|None request=None)
None process_wrong_login(Request request)
None process_success_login(Request request)
bool is_local(ConfigEntry entry)
AreaRegistry async_get(HomeAssistant hass)
bool is_cloud_connection(HomeAssistant hass)