1 """Config Flow using OAuth2.
3 This module exists of the following parts:
4 - OAuth2 config flow which supports multiple OAuth2 implementations
5 - OAuth2 implementation that works with local provided client ID/secret
9 from __future__
import annotations
11 from abc
import ABC, ABCMeta, abstractmethod
13 from asyncio
import Lock
14 from collections.abc
import Awaitable, Callable
15 from http
import HTTPStatus
16 from json
import JSONDecodeError
20 from typing
import Any, cast
22 from aiohttp
import ClientError, ClientResponseError, client, web
24 import voluptuous
as vol
27 from homeassistant
import config_entries
33 from .aiohttp_client
import async_get_clientsession
34 from .network
import NoURLAvailableError
36 _LOGGER = logging.getLogger(__name__)
38 DATA_JWT_SECRET =
"oauth2_jwt_secret"
39 DATA_IMPLEMENTATIONS: HassKey[dict[str, dict[str, AbstractOAuth2Implementation]]] = (
42 DATA_PROVIDERS: HassKey[
45 Callable[[HomeAssistant, str], Awaitable[list[AbstractOAuth2Implementation]]],
48 AUTH_CALLBACK_PATH =
"/auth/external/callback"
49 HEADER_FRONTEND_BASE =
"HA-Frontend-Base"
50 MY_AUTH_CALLBACK_PATH =
"https://my.home-assistant.io/redirect/oauth"
52 CLOCK_OUT_OF_SYNC_MAX_SEC = 20
54 OAUTH_AUTHORIZE_URL_TIMEOUT_SEC = 30
55 OAUTH_TOKEN_TIMEOUT_SEC = 30
59 """Base class to abstract OAuth2 authentication."""
64 """Name of the implementation."""
69 """Domain that is providing the implementation."""
73 """Generate a url for the user to authorize.
75 This step is called when a config flow is initialized. It should redirect the
76 user to the vendor website where they can authorize Home Assistant.
78 The implementation is responsible to get notified when the user is authorized
79 and pass this to the specified config flow. Do as little work as possible once
80 notified. You can do the work inside async_resolve_external_data. This will
83 Pass external data in with:
85 await hass.config_entries.flow.async_configure(
86 flow_id=flow_id, user_input={'code': 'abcd', 'state': … }
94 """Resolve external data to tokens.
96 Turn the data that the implementation passed to the config flow as external
97 step data into tokens. These tokens will be stored as 'token' in the
102 """Refresh a token and update expires info."""
105 new_token[
"expires_in"] =
int(new_token[
"expires_in"])
106 new_token[
"expires_at"] = time.time() + new_token[
"expires_in"]
111 """Refresh a token."""
115 """Local OAuth2 implementation."""
126 """Initialize local auth implementation."""
136 """Name of the implementation."""
137 return "Configuration.yaml"
141 """Domain providing the implementation."""
146 """Return the redirect uri."""
147 if "my" in self.
hasshass.config.components:
148 return MY_AUTH_CALLBACK_PATH
150 if (req := http.current_request.get())
is None:
151 raise RuntimeError(
"No current request in context")
153 if (ha_host := req.headers.get(HEADER_FRONTEND_BASE))
is None:
154 raise RuntimeError(
"No header in request")
156 return f
"{ha_host}{AUTH_CALLBACK_PATH}"
160 """Extra data that needs to be appended to the authorize url."""
164 """Generate a url for the user to authorize."""
170 "response_type":
"code",
172 "redirect_uri": redirect_uri,
174 self.
hasshass, {
"flow_id": flow_id,
"redirect_uri": redirect_uri}
182 """Resolve the authorization code to tokens."""
185 "grant_type":
"authorization_code",
186 "code": external_data[
"code"],
187 "redirect_uri": external_data[
"state"][
"redirect_uri"],
192 """Refresh tokens."""
195 "grant_type":
"refresh_token",
197 "refresh_token": token[
"refresh_token"],
200 return {**token, **new_token}
203 """Make a token request."""
206 data[
"client_id"] = self.
client_idclient_id
211 _LOGGER.debug(
"Sending token request to %s", self.
token_urltoken_url)
212 resp = await session.post(self.
token_urltoken_url, data=data)
213 if resp.status >= 400:
215 error_response = await resp.json()
216 except (ClientError, JSONDecodeError):
218 error_code = error_response.get(
"error",
"unknown")
219 error_description = error_response.get(
"error_description",
"unknown error")
221 "Token request for %s failed (%s): %s",
226 resp.raise_for_status()
227 return cast(dict, await resp.json())
231 """Handle a config flow."""
238 """Instantiate config flow."""
241 f
"Can't instantiate class {self.__class__.__name__} without DOMAIN"
246 self.
flow_implflow_impl: AbstractOAuth2Implementation =
None
255 """Extra data that needs to be appended to the authorize url."""
259 """Generate a url for the user to authorize."""
264 self, user_input: dict |
None =
None
266 """Handle a flow start."""
269 if user_input
is not None:
270 self.
flow_implflow_impl = implementations[user_input[
"implementation"]]
273 if not implementations:
278 req = http.current_request.get()
279 if len(implementations) == 1
and req
is not None:
286 step_id=
"pick_implementation",
287 data_schema=vol.Schema(
290 "implementation", default=
list(implementations)[0]
291 ): vol.In({key: impl.name
for key, impl
in implementations.items()})
297 self, user_input: dict[str, Any] |
None =
None
299 """Create an entry for auth."""
301 if user_input
is not None:
303 next_step =
"authorize_rejected" if "error" in user_input
else "creation"
307 async
with asyncio.timeout(OAUTH_AUTHORIZE_URL_TIMEOUT_SEC):
309 except TimeoutError
as err:
310 _LOGGER.error(
"Timeout generating authorize url: %s", err)
312 except NoURLAvailableError:
314 reason=
"no_url_available",
315 description_placeholders={
317 "https://www.home-assistant.io/more-info/no-url-available"
325 self, user_input: dict[str, Any] |
None =
None
327 """Create config entry from external data."""
328 _LOGGER.debug(
"Creating config entry from external data")
331 async
with asyncio.timeout(OAUTH_TOKEN_TIMEOUT_SEC):
332 token = await self.
flow_implflow_impl.async_resolve_external_data(
335 except TimeoutError
as err:
336 _LOGGER.error(
"Timeout resolving OAuth token: %s", err)
338 except (ClientResponseError, ClientError)
as err:
339 _LOGGER.error(
"Error resolving OAuth token: %s", err)
341 isinstance(err, ClientResponseError)
342 and err.status == HTTPStatus.UNAUTHORIZED
347 if "expires_in" not in token:
348 _LOGGER.warning(
"Invalid token: %s", token)
353 token[
"expires_in"] =
int(token[
"expires_in"])
354 except ValueError
as err:
355 _LOGGER.warning(
"Error converting expires_in to int: %s", err)
357 token[
"expires_at"] = time.time() + token[
"expires_in"]
359 self.
loggerlogger.info(
"Successfully authenticated")
362 {
"auth_implementation": self.
flow_implflow_impl.domain,
"token": token}
366 self, data: None =
None
368 """Step to handle flow rejection."""
370 reason=
"user_rejected_authorize",
371 description_placeholders={
"error": self.
external_dataexternal_data[
"error"]},
377 """Create an entry for the flow.
379 Ok to override if you want to fetch extra info or even add another step.
384 self, user_input: dict[str, Any] |
None =
None
386 """Handle a flow start."""
391 cls, hass: HomeAssistant, local_impl: LocalOAuth2Implementation
393 """Register a local implementation."""
399 hass: HomeAssistant, domain: str, implementation: AbstractOAuth2Implementation
401 """Register an OAuth2 flow implementation for an integration."""
402 implementations = hass.data.setdefault(DATA_IMPLEMENTATIONS, {})
403 implementations.setdefault(domain, {})[implementation.domain] = implementation
407 hass: HomeAssistant, domain: str
408 ) -> dict[str, AbstractOAuth2Implementation]:
409 """Return OAuth2 implementations for specified domain."""
410 registered = hass.data.setdefault(DATA_IMPLEMENTATIONS, {}).
get(domain, {})
412 if DATA_PROVIDERS
not in hass.data:
415 registered =
dict(registered)
416 for get_impl
in list(hass.data[DATA_PROVIDERS].values()):
417 for impl
in await get_impl(hass, domain):
418 registered[impl.domain] = impl
425 ) -> AbstractOAuth2Implementation:
426 """Return the implementation for this config entry."""
428 implementation = implementations.get(config_entry.data[
"auth_implementation"])
430 if implementation
is None:
431 raise ValueError(
"Implementation not available")
433 return implementation
439 provider_domain: str,
440 async_provide_implementation: Callable[
441 [HomeAssistant, str], Awaitable[list[AbstractOAuth2Implementation]]
444 """Add an implementation provider.
446 If no implementation found, return None.
448 hass.data.setdefault(DATA_PROVIDERS, {})[provider_domain] = (
449 async_provide_implementation
454 """OAuth2 Authorization Callback View."""
456 requires_auth =
False
457 url = AUTH_CALLBACK_PATH
458 name =
"auth:external:callback"
460 async
def get(self, request: web.Request) -> web.Response:
461 """Receive authorization code."""
462 if "state" not in request.query:
463 return web.Response(text=
"Missing state parameter")
465 hass = request.app[http.KEY_HASS]
472 "Invalid state. Is My Home Assistant configured "
473 "to go to the right instance?"
478 user_input: dict[str, Any] = {
"state": state}
480 if "code" in request.query:
481 user_input[
"code"] = request.query[
"code"]
482 elif "error" in request.query:
483 user_input[
"error"] = request.query[
"error"]
485 return web.Response(text=
"Missing code or error parameter")
487 await hass.config_entries.flow.async_configure(
488 flow_id=state[
"flow_id"], user_input=user_input
490 _LOGGER.debug(
"Resumed OAuth configuration flow")
492 headers={
"content-type":
"text/html"},
493 text=
"<script>window.close()</script>",
498 """Session to make requests authenticated with OAuth2."""
504 implementation: AbstractOAuth2Implementation,
506 """Initialize an OAuth2 session."""
514 """Return the token."""
515 return cast(dict, self.
config_entryconfig_entry.data[
"token"])
519 """Return if token is still valid."""
521 cast(float, self.
tokentoken[
"expires_at"])
522 > time.time() + CLOCK_OUT_OF_SYNC_MAX_SEC
526 """Ensure that the current token is valid."""
533 self.
hasshass.config_entries.async_update_entry(
538 self, method: str, url: str, **kwargs: Any
539 ) -> client.ClientResponse:
540 """Make a request."""
543 self.
hasshass, self.
config_entryconfig_entry.data[
"token"], method, url, **kwargs
548 hass: HomeAssistant, token: dict, method: str, url: str, **kwargs: Any
549 ) -> client.ClientResponse:
550 """Make an OAuth2 authenticated request.
552 This method will not refresh tokens. Use OAuth2 session for that.
555 headers = kwargs.pop(
"headers", {})
556 return await session.request(
562 "authorization": f
"Bearer {token['access_token']}",
569 """JWT encode data."""
570 if (secret := hass.data.get(DATA_JWT_SECRET))
is None:
571 secret = hass.data[DATA_JWT_SECRET] = secrets.token_hex()
573 return jwt.encode(data, secret, algorithm=
"HS256")
577 def _decode_jwt(hass: HomeAssistant, encoded: str) -> dict[str, Any] |
None:
578 """JWT encode data."""
579 secret: str |
None = hass.data.get(DATA_JWT_SECRET)
585 return jwt.decode(encoded, secret, algorithms=[
"HS256"])
586 except jwt.InvalidTokenError:
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step(self, *str|None step_id=None, str url, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step_done(self, *str next_step_id)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
dict extra_authorize_data(self)
config_entries.ConfigFlowResult async_step_pick_implementation(self, dict|None user_input=None)
config_entries.ConfigFlowResult async_step_auth(self, dict[str, Any]|None user_input=None)
config_entries.ConfigFlowResult async_step_authorize_rejected(self, None data=None)
config_entries.ConfigFlowResult async_oauth_create_entry(self, dict data)
str async_generate_authorize_url(self)
config_entries.ConfigFlowResult async_step_creation(self, dict[str, Any]|None user_input=None)
logging.Logger logger(self)
config_entries.ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None async_register_implementation(cls, HomeAssistant hass, LocalOAuth2Implementation local_impl)
dict async_refresh_token(self, dict token)
dict _async_refresh_token(self, dict token)
str async_generate_authorize_url(self, str flow_id)
dict async_resolve_external_data(self, Any external_data)
None __init__(self, HomeAssistant hass, str domain, str client_id, str client_secret, str authorize_url, str token_url)
str async_generate_authorize_url(self, str flow_id)
dict extra_authorize_data(self)
dict _token_request(self, dict data)
dict async_resolve_external_data(self, Any external_data)
dict _async_refresh_token(self, dict token)
web.Response get(self, web.Request request)
None __init__(self, HomeAssistant hass, config_entries.ConfigEntry config_entry, AbstractOAuth2Implementation implementation)
None async_ensure_token_valid(self)
client.ClientResponse async_request(self, str method, str url, **Any kwargs)
web.Response get(self, web.Request request, str config_key)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
None async_register_implementation(HomeAssistant hass, str domain, AbstractOAuth2Implementation implementation)
str _encode_jwt(HomeAssistant hass, dict data)
dict[str, Any]|None _decode_jwt(HomeAssistant hass, str encoded)
AbstractOAuth2Implementation async_get_config_entry_implementation(HomeAssistant hass, config_entries.ConfigEntry config_entry)
dict[str, AbstractOAuth2Implementation] async_get_implementations(HomeAssistant hass, str domain)
client.ClientResponse async_oauth2_request(HomeAssistant hass, dict token, str method, str url, **Any kwargs)
None async_add_implementation_provider(HomeAssistant hass, str provider_domain, Callable[[HomeAssistant, str], Awaitable[list[AbstractOAuth2Implementation]]] async_provide_implementation)
list[str] async_get_application_credentials(HomeAssistant hass)