Home Assistant Unofficial Reference 2024.12.1
auth.py
Go to the documentation of this file.
1 """Authentication for HTTP component."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Awaitable, Callable
6 from datetime import timedelta
7 from ipaddress import ip_address
8 import logging
9 import secrets
10 import time
11 from typing import Any, Final
12 
13 from aiohttp import hdrs
14 from aiohttp.web import Application, Request, StreamResponse, middleware
15 import jwt
16 from jwt import api_jws
17 from yarl import URL
18 
19 from homeassistant.auth import jwt_wrapper
20 from homeassistant.auth.const import GROUP_ID_READ_ONLY
21 from homeassistant.auth.models import User
22 from homeassistant.components import websocket_api
23 from homeassistant.core import HomeAssistant, callback
24 from homeassistant.helpers.http import current_request
25 from homeassistant.helpers.json import json_bytes
26 from homeassistant.helpers.network import is_cloud_connection
27 from homeassistant.helpers.storage import Store
28 from homeassistant.util.network import is_local
29 
30 from .const import KEY_AUTHENTICATED, KEY_HASS_REFRESH_TOKEN_ID, KEY_HASS_USER
31 
32 _LOGGER = logging.getLogger(__name__)
33 
34 DATA_API_PASSWORD: Final = "api_password"
35 DATA_SIGN_SECRET: Final = "http.auth.sign_secret"
36 SIGN_QUERY_PARAM: Final = "authSig"
37 SAFE_QUERY_PARAMS: Final = frozenset(("height", "width"))
38 
39 STORAGE_VERSION = 1
40 STORAGE_KEY = "http.auth"
41 CONTENT_USER_NAME = "Home Assistant Content"
42 
43 
44 @callback
46  hass: HomeAssistant,
47  path: str,
48  expiration: timedelta,
49  *,
50  refresh_token_id: str | None = None,
51  use_content_user: bool = False,
52 ) -> str:
53  """Sign a path for temporary access without auth header."""
54  if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
55  secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex()
56 
57  if refresh_token_id is None:
58  if use_content_user:
59  refresh_token_id = hass.data[STORAGE_KEY]
60  elif connection := websocket_api.current_connection.get():
61  refresh_token_id = connection.refresh_token_id
62  elif (
63  request := current_request.get()
64  ) and KEY_HASS_REFRESH_TOKEN_ID in request:
65  refresh_token_id = request[KEY_HASS_REFRESH_TOKEN_ID]
66  else:
67  refresh_token_id = hass.data[STORAGE_KEY]
68 
69  url = URL(path)
70  now_timestamp = int(time.time())
71  expiration_timestamp = now_timestamp + int(expiration.total_seconds())
72  params = [itm for itm in url.query.items() if itm[0] not in SAFE_QUERY_PARAMS]
73  json_payload = json_bytes(
74  {
75  "iss": refresh_token_id,
76  "path": url.path,
77  "params": params,
78  "iat": now_timestamp,
79  "exp": expiration_timestamp,
80  }
81  )
82  encoded = api_jws.encode(json_payload, secret, "HS256")
83  params.append((SIGN_QUERY_PARAM, encoded))
84  url = url.with_query(params)
85  return f"{url.path}?{url.query_string}"
86 
87 
88 @callback
90  hass: HomeAssistant, user: User, request: Request | None = None
91 ) -> str | None:
92  """Validate that user is not allowed to do auth things."""
93  if not user.is_active:
94  return "User is not active"
95 
96  if not user.local_only:
97  return None
98 
99  # User is marked as local only, check if they are allowed to do auth
100  if request is None:
101  request = current_request.get()
102 
103  if not request:
104  return "No request available to validate local access"
105 
106  if is_cloud_connection(hass):
107  return "User is local only"
108 
109  try:
110  remote_address = ip_address(request.remote) # type: ignore[arg-type]
111  except ValueError:
112  return "Invalid remote IP"
113 
114  if is_local(remote_address):
115  return None
116 
117  return "User cannot authenticate remotely"
118 
119 
121  hass: HomeAssistant,
122  app: Application,
123 ) -> None:
124  """Create auth middleware for the app."""
125  store = Store[dict[str, Any]](hass, STORAGE_VERSION, STORAGE_KEY)
126  if (data := await store.async_load()) is None:
127  data = {}
128 
129  refresh_token = None
130  if "content_user" in data:
131  user = await hass.auth.async_get_user(data["content_user"])
132  if user and user.refresh_tokens:
133  refresh_token = list(user.refresh_tokens.values())[0]
134 
135  if refresh_token is None:
136  user = await hass.auth.async_create_system_user(
137  CONTENT_USER_NAME, group_ids=[GROUP_ID_READ_ONLY]
138  )
139  refresh_token = await hass.auth.async_create_refresh_token(user)
140  data["content_user"] = user.id
141  await store.async_save(data)
142 
143  hass.data[STORAGE_KEY] = refresh_token.id
144 
145  @callback
146  def async_validate_auth_header(request: Request) -> bool:
147  """Test authorization header against access token.
148 
149  Basic auth_type is legacy code, should be removed with api_password.
150  """
151  try:
152  auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION, "").split(
153  " ", 1
154  )
155  except ValueError:
156  # If no space in authorization header
157  return False
158 
159  if auth_type != "Bearer":
160  return False
161 
162  refresh_token = hass.auth.async_validate_access_token(auth_val)
163 
164  if refresh_token is None:
165  return False
166 
167  if async_user_not_allowed_do_auth(hass, refresh_token.user, request):
168  return False
169 
170  request[KEY_HASS_USER] = refresh_token.user
171  request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
172  return True
173 
174  @callback
175  def async_validate_signed_request(request: Request) -> bool:
176  """Validate a signed request."""
177  if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
178  return False
179 
180  if (signature := request.query.get(SIGN_QUERY_PARAM)) is None:
181  return False
182 
183  try:
184  claims = jwt_wrapper.verify_and_decode(
185  signature, secret, algorithms=["HS256"], options={"verify_iss": False}
186  )
187  except jwt.InvalidTokenError:
188  return False
189 
190  if claims["path"] != request.path:
191  return False
192 
193  params = [
194  list(itm) # claims stores tuples as lists
195  for itm in request.query.items()
196  if itm[0] not in SAFE_QUERY_PARAMS and itm[0] != SIGN_QUERY_PARAM
197  ]
198  if claims["params"] != params:
199  return False
200 
201  refresh_token = hass.auth.async_get_refresh_token(claims["iss"])
202 
203  if refresh_token is None:
204  return False
205 
206  request[KEY_HASS_USER] = refresh_token.user
207  request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
208  return True
209 
210  @middleware
211  async def auth_middleware(
212  request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
213  ) -> StreamResponse:
214  """Authenticate as middleware."""
215  authenticated = False
216 
217  if hdrs.AUTHORIZATION in request.headers and async_validate_auth_header(
218  request
219  ):
220  authenticated = True
221  auth_type = "bearer token"
222 
223  # We first start with a string check to avoid parsing query params
224  # for every request.
225  elif (
226  request.method == "GET"
227  and SIGN_QUERY_PARAM in request.query_string
228  and async_validate_signed_request(request)
229  ):
230  authenticated = True
231  auth_type = "signed request"
232 
233  if authenticated and _LOGGER.isEnabledFor(logging.DEBUG):
234  _LOGGER.debug(
235  "Authenticated %s for %s using %s",
236  request.remote,
237  request.path,
238  auth_type,
239  )
240 
241  request[KEY_AUTHENTICATED] = authenticated
242  return await handler(request)
243 
244  app.middlewares.append(auth_middleware)
str|None async_user_not_allowed_do_auth(HomeAssistant hass, User user, Request|None request=None)
Definition: auth.py:91
None async_setup_auth(HomeAssistant hass, Application app)
Definition: auth.py:123
str async_sign_path(HomeAssistant hass, str path, timedelta expiration, *str|None refresh_token_id=None, bool use_content_user=False)
Definition: auth.py:52
bool is_local(ConfigEntry entry)
Definition: __init__.py:58
bool is_cloud_connection(HomeAssistant hass)
Definition: network.py:338