Home Assistant Unofficial Reference 2024.12.1
ingress.py
Go to the documentation of this file.
1 """Hass.io Add-on ingress service."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Iterable
7 from functools import lru_cache
8 from ipaddress import ip_address
9 import logging
10 from urllib.parse import quote
11 
12 import aiohttp
13 from aiohttp import ClientTimeout, ClientWebSocketResponse, hdrs, web
14 from aiohttp.web_exceptions import HTTPBadGateway, HTTPBadRequest
15 from multidict import CIMultiDict
16 from yarl import URL
17 
18 from homeassistant.components.http import HomeAssistantView
19 from homeassistant.core import HomeAssistant, callback
20 from homeassistant.helpers.aiohttp_client import async_get_clientsession
21 from homeassistant.helpers.typing import UNDEFINED
22 from homeassistant.util.async_ import create_eager_task
23 
24 from .const import X_HASS_SOURCE, X_INGRESS_PATH
25 from .http import should_compress
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 INIT_HEADERS_FILTER = {
30  hdrs.CONTENT_LENGTH,
31  hdrs.CONTENT_ENCODING,
32  hdrs.TRANSFER_ENCODING,
33  hdrs.ACCEPT_ENCODING, # Avoid local compression, as we will compress at the border
34  hdrs.SEC_WEBSOCKET_EXTENSIONS,
35  hdrs.SEC_WEBSOCKET_PROTOCOL,
36  hdrs.SEC_WEBSOCKET_VERSION,
37  hdrs.SEC_WEBSOCKET_KEY,
38 }
39 RESPONSE_HEADERS_FILTER = {
40  hdrs.TRANSFER_ENCODING,
41  hdrs.CONTENT_LENGTH,
42  hdrs.CONTENT_TYPE,
43  hdrs.CONTENT_ENCODING,
44 }
45 
46 MIN_COMPRESSED_SIZE = 128
47 MAX_SIMPLE_RESPONSE_SIZE = 4194000
48 
49 
50 @callback
51 def async_setup_ingress_view(hass: HomeAssistant, host: str) -> None:
52  """Auth setup."""
53  websession = async_get_clientsession(hass)
54 
55  hassio_ingress = HassIOIngress(host, websession)
56  hass.http.register_view(hassio_ingress)
57 
58 
59 class HassIOIngress(HomeAssistantView):
60  """Hass.io view to handle base part."""
61 
62  name = "api:hassio:ingress"
63  url = "/api/hassio_ingress/{token}/{path:.*}"
64  requires_auth = False
65 
66  def __init__(self, host: str, websession: aiohttp.ClientSession) -> None:
67  """Initialize a Hass.io ingress view."""
68  self._host_host = host
69  self._websession_websession = websession
70  self._url_url = URL(f"http://{host}")
71 
72  @lru_cache
73  def _create_url(self, token: str, path: str) -> URL:
74  """Create URL to service."""
75  base_path = f"/ingress/{token}/"
76 
77  try:
78  target_url = self._url_url.join(URL(f"{base_path}{quote(path)}"))
79  except ValueError as err:
80  raise HTTPBadRequest from err
81 
82  if not target_url.path.startswith(base_path):
83  raise HTTPBadRequest
84 
85  return target_url
86 
87  async def _handle(
88  self, request: web.Request, token: str, path: str
89  ) -> web.Response | web.StreamResponse | web.WebSocketResponse:
90  """Route data to Hass.io ingress service."""
91  try:
92  # Websocket
93  if _is_websocket(request):
94  return await self._handle_websocket_handle_websocket(request, token, path)
95 
96  # Request
97  return await self._handle_request_handle_request(request, token, path)
98 
99  except aiohttp.ClientError as err:
100  _LOGGER.debug("Ingress error with %s / %s: %s", token, path, err)
101 
102  raise HTTPBadGateway from None
103 
104  get = _handle
105  post = _handle
106  put = _handle
107  delete = _handle
108  patch = _handle
109  options = _handle
110 
111  async def _handle_websocket(
112  self, request: web.Request, token: str, path: str
113  ) -> web.WebSocketResponse:
114  """Ingress route for websocket."""
115  req_protocols: Iterable[str]
116  if hdrs.SEC_WEBSOCKET_PROTOCOL in request.headers:
117  req_protocols = [
118  str(proto.strip())
119  for proto in request.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
120  ]
121  else:
122  req_protocols = ()
123 
124  ws_server = web.WebSocketResponse(
125  protocols=req_protocols, autoclose=False, autoping=False
126  )
127  await ws_server.prepare(request)
128 
129  # Preparing
130  url = self._create_url_create_url(token, path)
131  source_header = _init_header(request, token)
132 
133  # Support GET query
134  if request.query_string:
135  url = url.with_query(request.query_string)
136 
137  # Start proxy
138  async with self._websession_websession.ws_connect(
139  url,
140  headers=source_header,
141  protocols=req_protocols,
142  autoclose=False,
143  autoping=False,
144  ) as ws_client:
145  # Proxy requests
146  await asyncio.wait(
147  [
148  create_eager_task(_websocket_forward(ws_server, ws_client)),
149  create_eager_task(_websocket_forward(ws_client, ws_server)),
150  ],
151  return_when=asyncio.FIRST_COMPLETED,
152  )
153 
154  return ws_server
155 
156  async def _handle_request(
157  self, request: web.Request, token: str, path: str
158  ) -> web.Response | web.StreamResponse:
159  """Ingress route for request."""
160  url = self._create_url_create_url(token, path)
161  source_header = _init_header(request, token)
162 
163  async with self._websession_websession.request(
164  request.method,
165  url,
166  headers=source_header,
167  params=request.query,
168  allow_redirects=False,
169  data=request.content if request.method != "GET" else None,
170  timeout=ClientTimeout(total=None),
171  skip_auto_headers={hdrs.CONTENT_TYPE},
172  ) as result:
173  headers = _response_header(result)
174  content_length_int = 0
175  content_length = result.headers.get(hdrs.CONTENT_LENGTH, UNDEFINED)
176  # Avoid parsing content_type in simple cases for better performance
177  if maybe_content_type := result.headers.get(hdrs.CONTENT_TYPE):
178  content_type: str = (maybe_content_type.partition(";"))[0].strip()
179  else:
180  # default value according to RFC 2616
181  content_type = "application/octet-stream"
182 
183  # Simple request
184  if result.status in (204, 304) or (
185  content_length is not UNDEFINED
186  and (content_length_int := int(content_length))
187  <= MAX_SIMPLE_RESPONSE_SIZE
188  ):
189  # Return Response
190  body = await result.read()
191  simple_response = web.Response(
192  headers=headers,
193  status=result.status,
194  content_type=content_type,
195  body=body,
196  zlib_executor_size=32768,
197  )
198  if content_length_int > MIN_COMPRESSED_SIZE and should_compress(
199  content_type
200  ):
201  simple_response.enable_compression()
202  return simple_response
203 
204  # Stream response
205  response = web.StreamResponse(status=result.status, headers=headers)
206  response.content_type = content_type
207 
208  try:
209  if should_compress(content_type):
210  response.enable_compression()
211  await response.prepare(request)
212  # In testing iter_chunked, iter_any, and iter_chunks:
213  # iter_chunks was the best performing option since
214  # it does not have to do as much re-assembly
215  async for data, _ in result.content.iter_chunks():
216  await response.write(data)
217 
218  except (
219  aiohttp.ClientError,
220  aiohttp.ClientPayloadError,
221  ConnectionResetError,
222  ) as err:
223  _LOGGER.debug("Stream error %s / %s: %s", token, path, err)
224 
225  return response
226 
227 
228 @lru_cache(maxsize=32)
229 def _forwarded_for_header(forward_for: str | None, peer_name: str) -> str:
230  """Create X-Forwarded-For header."""
231  connected_ip = ip_address(peer_name)
232  return f"{forward_for}, {connected_ip!s}" if forward_for else f"{connected_ip!s}"
233 
234 
235 def _init_header(request: web.Request, token: str) -> CIMultiDict | dict[str, str]:
236  """Create initial header."""
237  headers = {
238  name: value
239  for name, value in request.headers.items()
240  if name not in INIT_HEADERS_FILTER
241  }
242  # Ingress information
243  headers[X_HASS_SOURCE] = "core.ingress"
244  headers[X_INGRESS_PATH] = f"/api/hassio_ingress/{token}"
245 
246  # Set X-Forwarded-For
247  forward_for = request.headers.get(hdrs.X_FORWARDED_FOR)
248  assert request.transport
249  if (peername := request.transport.get_extra_info("peername")) is None:
250  _LOGGER.error("Can't set forward_for header, missing peername")
251  raise HTTPBadRequest
252 
253  headers[hdrs.X_FORWARDED_FOR] = _forwarded_for_header(forward_for, peername[0])
254 
255  # Set X-Forwarded-Host
256  if not (forward_host := request.headers.get(hdrs.X_FORWARDED_HOST)):
257  forward_host = request.host
258  headers[hdrs.X_FORWARDED_HOST] = forward_host
259 
260  # Set X-Forwarded-Proto
261  forward_proto = request.headers.get(hdrs.X_FORWARDED_PROTO)
262  if not forward_proto:
263  forward_proto = request.scheme
264  headers[hdrs.X_FORWARDED_PROTO] = forward_proto
265 
266  return headers
267 
268 
269 def _response_header(response: aiohttp.ClientResponse) -> dict[str, str]:
270  """Create response header."""
271  return {
272  name: value
273  for name, value in response.headers.items()
274  if name not in RESPONSE_HEADERS_FILTER
275  }
276 
277 
278 def _is_websocket(request: web.Request) -> bool:
279  """Return True if request is a websocket."""
280  headers = request.headers
281  return bool(
282  "upgrade" in headers.get(hdrs.CONNECTION, "").lower()
283  and headers.get(hdrs.UPGRADE, "").lower() == "websocket"
284  )
285 
286 
288  ws_from: web.WebSocketResponse | ClientWebSocketResponse,
289  ws_to: web.WebSocketResponse | ClientWebSocketResponse,
290 ) -> None:
291  """Handle websocket message directly."""
292  try:
293  async for msg in ws_from:
294  if msg.type is aiohttp.WSMsgType.TEXT:
295  await ws_to.send_str(msg.data)
296  elif msg.type is aiohttp.WSMsgType.BINARY:
297  await ws_to.send_bytes(msg.data)
298  elif msg.type is aiohttp.WSMsgType.PING:
299  await ws_to.ping()
300  elif msg.type is aiohttp.WSMsgType.PONG:
301  await ws_to.pong()
302  elif ws_to.closed:
303  await ws_to.close(code=ws_to.close_code, message=msg.extra) # type: ignore[arg-type]
304  except RuntimeError:
305  _LOGGER.debug("Ingress Websocket runtime error")
306  except ConnectionResetError:
307  _LOGGER.debug("Ingress Websocket Connection Reset")
URL _create_url(self, str token, str path)
Definition: ingress.py:73
web.WebSocketResponse _handle_websocket(self, web.Request request, str token, str path)
Definition: ingress.py:113
web.Response|web.StreamResponse|web.WebSocketResponse _handle(self, web.Request request, str token, str path)
Definition: ingress.py:89
web.Response|web.StreamResponse _handle_request(self, web.Request request, str token, str path)
Definition: ingress.py:158
None __init__(self, str host, aiohttp.ClientSession websession)
Definition: ingress.py:66
bool should_compress(str content_type, str|None path=None)
Definition: http.py:265
None _websocket_forward(web.WebSocketResponse|ClientWebSocketResponse ws_from, web.WebSocketResponse|ClientWebSocketResponse ws_to)
Definition: ingress.py:290
CIMultiDict|dict[str, str] _init_header(web.Request request, str token)
Definition: ingress.py:235
dict[str, str] _response_header(aiohttp.ClientResponse response)
Definition: ingress.py:269
str _forwarded_for_header(str|None forward_for, str peer_name)
Definition: ingress.py:229
bool _is_websocket(web.Request request)
Definition: ingress.py:278
None async_setup_ingress_view(HomeAssistant hass, str host)
Definition: ingress.py:51
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)