Home Assistant Unofficial Reference 2024.12.1
auth.py
Go to the documentation of this file.
1 """Implement the auth feature from Hass.io for Add-ons."""
2 
3 from http import HTTPStatus
4 from ipaddress import ip_address
5 import logging
6 import os
7 
8 from aiohttp import web
9 from aiohttp.web_exceptions import HTTPNotFound, HTTPUnauthorized
10 import voluptuous as vol
11 
12 from homeassistant.auth.models import User
13 from homeassistant.auth.providers import homeassistant as auth_ha
14 from homeassistant.components.http import KEY_HASS, KEY_HASS_USER, HomeAssistantView
15 from homeassistant.components.http.data_validator import RequestDataValidator
16 from homeassistant.core import HomeAssistant, callback
18 
19 from .const import ATTR_ADDON, ATTR_PASSWORD, ATTR_USERNAME
20 
21 _LOGGER = logging.getLogger(__name__)
22 
23 
24 @callback
25 def async_setup_auth_view(hass: HomeAssistant, user: User) -> None:
26  """Auth setup."""
27  hassio_auth = HassIOAuth(hass, user)
28  hassio_password_reset = HassIOPasswordReset(hass, user)
29 
30  hass.http.register_view(hassio_auth)
31  hass.http.register_view(hassio_password_reset)
32 
33 
34 class HassIOBaseAuth(HomeAssistantView):
35  """Hass.io view to handle auth requests."""
36 
37  def __init__(self, hass: HomeAssistant, user: User) -> None:
38  """Initialize WebView."""
39  self.hasshass = hass
40  self.useruser = user
41 
42  def _check_access(self, request: web.Request) -> None:
43  """Check if this call is from Supervisor."""
44  # Check caller IP
45  hassio_ip = os.environ["SUPERVISOR"].split(":")[0]
46  assert request.transport
47  if ip_address(request.transport.get_extra_info("peername")[0]) != ip_address(
48  hassio_ip
49  ):
50  _LOGGER.error("Invalid auth request from %s", request.remote)
51  raise HTTPUnauthorized
52 
53  # Check caller token
54  if request[KEY_HASS_USER].id != self.useruser.id:
55  _LOGGER.error("Invalid auth request from %s", request[KEY_HASS_USER].name)
56  raise HTTPUnauthorized
57 
58 
60  """Hass.io view to handle auth requests."""
61 
62  name = "api:hassio:auth"
63  url = "/api/hassio_auth"
64 
65  @RequestDataValidator( vol.Schema( { vol.Required(ATTR_USERNAME): cv.string,
66  vol.Required(ATTR_PASSWORD): cv.string,
67  vol.Required(ATTR_ADDON): cv.string,
68  },
69  extra=vol.ALLOW_EXTRA,
70  )
71  )
72  async def post(self, request: web.Request, data: dict[str, str]) -> web.Response:
73  """Handle auth requests."""
74  self._check_access_check_access(request)
75  provider = auth_ha.async_get_provider(request.app[KEY_HASS])
76 
77  try:
78  await provider.async_validate_login(
79  data[ATTR_USERNAME], data[ATTR_PASSWORD]
80  )
81  except auth_ha.InvalidAuth:
82  raise HTTPNotFound from None
83 
84  return web.Response(status=HTTPStatus.OK)
85 
86 
88  """Hass.io view to handle password reset requests."""
89 
90  name = "api:hassio:auth:password:reset"
91  url = "/api/hassio_auth/password_reset"
92 
93  @RequestDataValidator( vol.Schema( { vol.Required(ATTR_USERNAME): cv.string,
94  vol.Required(ATTR_PASSWORD): cv.string,
95  },
96  extra=vol.ALLOW_EXTRA,
97  )
98  )
99  async def post(self, request: web.Request, data: dict[str, str]) -> web.Response:
100  """Handle password reset requests."""
101  self._check_access_check_access(request)
102  provider = auth_ha.async_get_provider(request.app[KEY_HASS])
103 
104  try:
105  await provider.async_change_password(
106  data[ATTR_USERNAME], data[ATTR_PASSWORD]
107  )
108  except auth_ha.InvalidUser as err:
109  raise HTTPNotFound from err
110 
111  return web.Response(status=HTTPStatus.OK)