1 """Authentication for HTTP component."""
3 from __future__
import annotations
5 from collections.abc
import Awaitable, Callable
6 from datetime
import timedelta
7 from ipaddress
import ip_address
11 from typing
import Any, Final
13 from aiohttp
import hdrs
14 from aiohttp.web
import Application, Request, StreamResponse, middleware
16 from jwt
import api_jws
30 from .const
import KEY_AUTHENTICATED, KEY_HASS_REFRESH_TOKEN_ID, KEY_HASS_USER
32 _LOGGER = logging.getLogger(__name__)
34 DATA_API_PASSWORD: Final =
"api_password"
35 DATA_SIGN_SECRET: Final =
"http.auth.sign_secret"
36 SIGN_QUERY_PARAM: Final =
"authSig"
37 SAFE_QUERY_PARAMS: Final = frozenset((
"height",
"width"))
40 STORAGE_KEY =
"http.auth"
41 CONTENT_USER_NAME =
"Home Assistant Content"
48 expiration: timedelta,
50 refresh_token_id: str |
None =
None,
51 use_content_user: bool =
False,
53 """Sign a path for temporary access without auth header."""
54 if (secret := hass.data.get(DATA_SIGN_SECRET))
is None:
55 secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex()
57 if refresh_token_id
is None:
59 refresh_token_id = hass.data[STORAGE_KEY]
60 elif connection := websocket_api.current_connection.get():
61 refresh_token_id = connection.refresh_token_id
63 request := current_request.get()
64 )
and KEY_HASS_REFRESH_TOKEN_ID
in request:
65 refresh_token_id = request[KEY_HASS_REFRESH_TOKEN_ID]
67 refresh_token_id = hass.data[STORAGE_KEY]
70 now_timestamp =
int(time.time())
71 expiration_timestamp = now_timestamp +
int(expiration.total_seconds())
72 params = [itm
for itm
in url.query.items()
if itm[0]
not in SAFE_QUERY_PARAMS]
75 "iss": refresh_token_id,
79 "exp": expiration_timestamp,
82 encoded = api_jws.encode(json_payload, secret,
"HS256")
83 params.append((SIGN_QUERY_PARAM, encoded))
84 url = url.with_query(params)
85 return f
"{url.path}?{url.query_string}"
90 hass: HomeAssistant, user: User, request: Request |
None =
None
92 """Validate that user is not allowed to do auth things."""
93 if not user.is_active:
94 return "User is not active"
96 if not user.local_only:
101 request = current_request.get()
104 return "No request available to validate local access"
107 return "User is local only"
110 remote_address = ip_address(request.remote)
112 return "Invalid remote IP"
117 return "User cannot authenticate remotely"
124 """Create auth middleware for the app."""
125 store = Store[dict[str, Any]](hass, STORAGE_VERSION, STORAGE_KEY)
126 if (data := await store.async_load())
is None:
130 if "content_user" in data:
131 user = await hass.auth.async_get_user(data[
"content_user"])
132 if user
and user.refresh_tokens:
133 refresh_token =
list(user.refresh_tokens.values())[0]
135 if refresh_token
is None:
136 user = await hass.auth.async_create_system_user(
137 CONTENT_USER_NAME, group_ids=[GROUP_ID_READ_ONLY]
139 refresh_token = await hass.auth.async_create_refresh_token(user)
140 data[
"content_user"] = user.id
141 await store.async_save(data)
143 hass.data[STORAGE_KEY] = refresh_token.id
146 def async_validate_auth_header(request: Request) -> bool:
147 """Test authorization header against access token.
149 Basic auth_type is legacy code, should be removed with api_password.
152 auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION,
"").split(
159 if auth_type !=
"Bearer":
162 refresh_token = hass.auth.async_validate_access_token(auth_val)
164 if refresh_token
is None:
170 request[KEY_HASS_USER] = refresh_token.user
171 request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
175 def async_validate_signed_request(request: Request) -> bool:
176 """Validate a signed request."""
177 if (secret := hass.data.get(DATA_SIGN_SECRET))
is None:
180 if (signature := request.query.get(SIGN_QUERY_PARAM))
is None:
184 claims = jwt_wrapper.verify_and_decode(
185 signature, secret, algorithms=[
"HS256"], options={
"verify_iss":
False}
187 except jwt.InvalidTokenError:
190 if claims[
"path"] != request.path:
195 for itm
in request.query.items()
196 if itm[0]
not in SAFE_QUERY_PARAMS
and itm[0] != SIGN_QUERY_PARAM
198 if claims[
"params"] != params:
201 refresh_token = hass.auth.async_get_refresh_token(claims[
"iss"])
203 if refresh_token
is None:
206 request[KEY_HASS_USER] = refresh_token.user
207 request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
211 async
def auth_middleware(
212 request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
214 """Authenticate as middleware."""
215 authenticated =
False
217 if hdrs.AUTHORIZATION
in request.headers
and async_validate_auth_header(
221 auth_type =
"bearer token"
226 request.method ==
"GET"
227 and SIGN_QUERY_PARAM
in request.query_string
228 and async_validate_signed_request(request)
231 auth_type =
"signed request"
233 if authenticated
and _LOGGER.isEnabledFor(logging.DEBUG):
235 "Authenticated %s for %s using %s",
241 request[KEY_AUTHENTICATED] = authenticated
242 return await handler(request)
244 app.middlewares.append(auth_middleware)
str|None async_user_not_allowed_do_auth(HomeAssistant hass, User user, Request|None request=None)
None async_setup_auth(HomeAssistant hass, Application app)
str async_sign_path(HomeAssistant hass, str path, timedelta expiration, *str|None refresh_token_id=None, bool use_content_user=False)
bool is_local(ConfigEntry entry)
bool is_cloud_connection(HomeAssistant hass)