Home Assistant Unofficial Reference 2024.12.1
auth.py
Go to the documentation of this file.
1 """Support for Alexa skill auth."""
2 
3 import asyncio
4 from asyncio import timeout
5 from datetime import datetime, timedelta
6 from http import HTTPStatus
7 import json
8 import logging
9 from typing import Any
10 
11 import aiohttp
12 
13 from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET
14 from homeassistant.core import HomeAssistant, callback
15 from homeassistant.helpers import aiohttp_client
16 from homeassistant.helpers.storage import Store
17 from homeassistant.util import dt as dt_util
18 
19 from .const import STORAGE_ACCESS_TOKEN, STORAGE_REFRESH_TOKEN
20 from .diagnostics import async_redact_lwa_params
21 
22 _LOGGER = logging.getLogger(__name__)
23 
24 LWA_TOKEN_URI = "https://api.amazon.com/auth/o2/token"
25 LWA_HEADERS = {"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"}
26 
27 PREEMPTIVE_REFRESH_TTL_IN_SECONDS = 300
28 STORAGE_KEY = "alexa_auth"
29 STORAGE_VERSION = 1
30 STORAGE_EXPIRE_TIME = "expire_time"
31 
32 
33 class Auth:
34  """Handle authentication to send events to Alexa."""
35 
36  def __init__(self, hass: HomeAssistant, client_id: str, client_secret: str) -> None:
37  """Initialize the Auth class."""
38  self.hasshass = hass
39 
40  self.client_idclient_id = client_id
41  self.client_secretclient_secret = client_secret
42 
43  self._prefs_prefs: dict[str, Any] | None = None
44  self._store: Store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
45 
46  self._get_token_lock_get_token_lock = asyncio.Lock()
47 
48  async def async_do_auth(self, accept_grant_code: str) -> str | None:
49  """Do authentication with an AcceptGrant code."""
50  # access token not retrieved yet for the first time, so this should
51  # be an access token request
52 
53  lwa_params: dict[str, str] = {
54  "grant_type": "authorization_code",
55  "code": accept_grant_code,
56  CONF_CLIENT_ID: self.client_idclient_id,
57  CONF_CLIENT_SECRET: self.client_secretclient_secret,
58  }
59  _LOGGER.debug(
60  "Calling LWA to get the access token (first time), with: %s",
61  json.dumps(async_redact_lwa_params(lwa_params)),
62  )
63 
64  return await self._async_request_new_token_async_request_new_token(lwa_params)
65 
66  @callback
67  def async_invalidate_access_token(self) -> None:
68  """Invalidate access token."""
69  assert self._prefs_prefs is not None
70  self._prefs_prefs[STORAGE_ACCESS_TOKEN] = None
71 
72  async def async_get_access_token(self) -> str | None:
73  """Perform access token or token refresh request."""
74  async with self._get_token_lock_get_token_lock:
75  if self._prefs_prefs is None:
76  await self.async_load_preferencesasync_load_preferences()
77 
78  assert self._prefs_prefs is not None
79  if self.is_token_validis_token_valid():
80  _LOGGER.debug("Token still valid, using it")
81  token: str = self._prefs_prefs[STORAGE_ACCESS_TOKEN]
82  return token
83 
84  if self._prefs_prefs[STORAGE_REFRESH_TOKEN] is None:
85  _LOGGER.debug("Token invalid and no refresh token available")
86  return None
87 
88  lwa_params: dict[str, str] = {
89  "grant_type": "refresh_token",
90  "refresh_token": self._prefs_prefs[STORAGE_REFRESH_TOKEN],
91  CONF_CLIENT_ID: self.client_idclient_id,
92  CONF_CLIENT_SECRET: self.client_secretclient_secret,
93  }
94 
95  _LOGGER.debug("Calling LWA to refresh the access token")
96  return await self._async_request_new_token_async_request_new_token(lwa_params)
97 
98  @callback
99  def is_token_valid(self) -> bool:
100  """Check if a token is already loaded and if it is still valid."""
101  assert self._prefs_prefs is not None
102  if not self._prefs_prefs[STORAGE_ACCESS_TOKEN]:
103  return False
104 
105  expire_time: datetime | None = dt_util.parse_datetime(
106  self._prefs_prefs[STORAGE_EXPIRE_TIME]
107  )
108  assert expire_time is not None
109  preemptive_expire_time = expire_time - timedelta(
110  seconds=PREEMPTIVE_REFRESH_TTL_IN_SECONDS
111  )
112 
113  return dt_util.utcnow() < preemptive_expire_time
114 
115  async def _async_request_new_token(self, lwa_params: dict[str, str]) -> str | None:
116  try:
117  session = aiohttp_client.async_get_clientsession(self.hasshass)
118  async with timeout(10):
119  response = await session.post(
120  LWA_TOKEN_URI,
121  headers=LWA_HEADERS,
122  data=lwa_params,
123  allow_redirects=True,
124  )
125 
126  except (TimeoutError, aiohttp.ClientError):
127  _LOGGER.error("Timeout calling LWA to get auth token")
128  return None
129 
130  _LOGGER.debug("LWA response header: %s", response.headers)
131  _LOGGER.debug("LWA response status: %s", response.status)
132 
133  if response.status != HTTPStatus.OK:
134  _LOGGER.error("Error calling LWA to get auth token")
135  return None
136 
137  response_json = await response.json()
138  _LOGGER.debug("LWA response body : %s", async_redact_lwa_params(response_json))
139 
140  access_token: str = response_json["access_token"]
141  refresh_token: str = response_json["refresh_token"]
142  expires_in: int = response_json["expires_in"]
143  expire_time = dt_util.utcnow() + timedelta(seconds=expires_in)
144 
145  await self._async_update_preferences_async_update_preferences(
146  access_token, refresh_token, expire_time.isoformat()
147  )
148 
149  return access_token
150 
151  async def async_load_preferences(self) -> None:
152  """Load preferences with stored tokens."""
153  self._prefs_prefs = await self._store.async_load()
154 
155  if self._prefs_prefs is None:
156  self._prefs_prefs = {
157  STORAGE_ACCESS_TOKEN: None,
158  STORAGE_REFRESH_TOKEN: None,
159  STORAGE_EXPIRE_TIME: None,
160  }
161 
163  self, access_token: str, refresh_token: str, expire_time: str
164  ) -> None:
165  """Update user preferences."""
166  if self._prefs_prefs is None:
167  await self.async_load_preferencesasync_load_preferences()
168  assert self._prefs_prefs is not None
169 
170  if access_token is not None:
171  self._prefs_prefs[STORAGE_ACCESS_TOKEN] = access_token
172  if refresh_token is not None:
173  self._prefs_prefs[STORAGE_REFRESH_TOKEN] = refresh_token
174  if expire_time is not None:
175  self._prefs_prefs[STORAGE_EXPIRE_TIME] = expire_time
176  await self._store.async_save(self._prefs_prefs)
None _async_update_preferences(self, str access_token, str refresh_token, str expire_time)
Definition: auth.py:164
str|None _async_request_new_token(self, dict[str, str] lwa_params)
Definition: auth.py:115
str|None async_do_auth(self, str accept_grant_code)
Definition: auth.py:48
str|None async_get_access_token(self)
Definition: auth.py:72
None __init__(self, HomeAssistant hass, str client_id, str client_secret)
Definition: auth.py:36
dict[str, str] async_redact_lwa_params(dict[str, str] lwa_params)
Definition: diagnostics.py:26
None async_load(HomeAssistant hass)
None async_save(self, _T data)
Definition: storage.py:424