Home Assistant Unofficial Reference 2024.12.1
auth.py
Go to the documentation of this file.
1 """Handle the auth of a connection."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Coroutine
6 from typing import TYPE_CHECKING, Any, Final
7 
8 from aiohttp.web import Request
9 import voluptuous as vol
10 from voluptuous.humanize import humanize_error
11 
12 from homeassistant.components.http.ban import process_success_login, process_wrong_login
13 from homeassistant.const import __version__
14 from homeassistant.core import CALLBACK_TYPE, HomeAssistant
15 from homeassistant.helpers.json import json_bytes
16 from homeassistant.util.json import JsonValueType
17 
18 from .connection import ActiveConnection
19 from .error import Disconnect
20 
21 if TYPE_CHECKING:
22  from .http import WebSocketAdapter
23 
24 
25 TYPE_AUTH: Final = "auth"
26 TYPE_AUTH_INVALID: Final = "auth_invalid"
27 TYPE_AUTH_OK: Final = "auth_ok"
28 TYPE_AUTH_REQUIRED: Final = "auth_required"
29 
30 AUTH_MESSAGE_SCHEMA: Final = vol.Schema(
31  {
32  vol.Required("type"): TYPE_AUTH,
33  vol.Exclusive("api_password", "auth"): str,
34  vol.Exclusive("access_token", "auth"): str,
35  }
36 )
37 
38 AUTH_OK_MESSAGE = json_bytes({"type": TYPE_AUTH_OK, "ha_version": __version__})
39 AUTH_REQUIRED_MESSAGE = json_bytes(
40  {"type": TYPE_AUTH_REQUIRED, "ha_version": __version__}
41 )
42 
43 
44 def auth_invalid_message(message: str) -> bytes:
45  """Return an auth_invalid message."""
46  return json_bytes({"type": TYPE_AUTH_INVALID, "message": message})
47 
48 
49 class AuthPhase:
50  """Connection that requires client to authenticate first."""
51 
52  def __init__(
53  self,
54  logger: WebSocketAdapter,
55  hass: HomeAssistant,
56  send_message: Callable[[bytes | str | dict[str, Any]], None],
57  cancel_ws: CALLBACK_TYPE,
58  request: Request,
59  send_bytes_text: Callable[[bytes], Coroutine[Any, Any, None]],
60  ) -> None:
61  """Initialize the authenticated connection."""
62  self._hass_hass = hass
63  # send_message will send a message to the client via the queue.
64  self._send_message_send_message = send_message
65  self._cancel_ws_cancel_ws = cancel_ws
66  self._logger_logger = logger
67  self._request_request = request
68  # send_bytes_text will directly send a message to the client.
69  self._send_bytes_text_send_bytes_text = send_bytes_text
70 
71  async def async_handle(self, msg: JsonValueType) -> ActiveConnection:
72  """Handle authentication."""
73  try:
74  valid_msg = AUTH_MESSAGE_SCHEMA(msg)
75  except vol.Invalid as err:
76  error_msg = (
77  f"Auth message incorrectly formatted: {humanize_error(msg, err)}"
78  )
79  self._logger_logger.warning(error_msg)
80  await self._send_bytes_text_send_bytes_text(auth_invalid_message(error_msg))
81  raise Disconnect from err
82 
83  if (access_token := valid_msg.get("access_token")) and (
84  refresh_token := self._hass_hass.auth.async_validate_access_token(access_token)
85  ):
86  conn = ActiveConnection(
87  self._logger_logger,
88  self._hass_hass,
89  self._send_message_send_message,
90  refresh_token.user,
91  refresh_token,
92  )
93  conn.subscriptions["auth"] = (
94  self._hass_hass.auth.async_register_revoke_token_callback(
95  refresh_token.id, self._cancel_ws_cancel_ws
96  )
97  )
98  await self._send_bytes_text_send_bytes_text(AUTH_OK_MESSAGE)
99  self._logger_logger.debug("Auth OK")
100  process_success_login(self._request_request)
101  return conn
102 
103  await self._send_bytes_text_send_bytes_text(
104  auth_invalid_message("Invalid access token or password")
105  )
106  await process_wrong_login(self._request_request)
107  raise Disconnect
ActiveConnection async_handle(self, JsonValueType msg)
Definition: auth.py:71
None __init__(self, WebSocketAdapter logger, HomeAssistant hass, Callable[[bytes|str|dict[str, Any]], None] send_message, CALLBACK_TYPE cancel_ws, Request request, Callable[[bytes], Coroutine[Any, Any, None]] send_bytes_text)
Definition: auth.py:60
None process_wrong_login(Request request)
Definition: ban.py:109
None process_success_login(Request request)
Definition: ban.py:173
bytes auth_invalid_message(str message)
Definition: auth.py:44