1 """Hass.io Add-on ingress service."""
3 from __future__
import annotations
6 from collections.abc
import Iterable
7 from functools
import lru_cache
8 from ipaddress
import ip_address
10 from urllib.parse
import quote
13 from aiohttp
import ClientTimeout, ClientWebSocketResponse, hdrs, web
14 from aiohttp.web_exceptions
import HTTPBadGateway, HTTPBadRequest
15 from multidict
import CIMultiDict
24 from .const
import X_HASS_SOURCE, X_INGRESS_PATH
25 from .http
import should_compress
27 _LOGGER = logging.getLogger(__name__)
29 INIT_HEADERS_FILTER = {
31 hdrs.CONTENT_ENCODING,
32 hdrs.TRANSFER_ENCODING,
34 hdrs.SEC_WEBSOCKET_EXTENSIONS,
35 hdrs.SEC_WEBSOCKET_PROTOCOL,
36 hdrs.SEC_WEBSOCKET_VERSION,
37 hdrs.SEC_WEBSOCKET_KEY,
39 RESPONSE_HEADERS_FILTER = {
40 hdrs.TRANSFER_ENCODING,
43 hdrs.CONTENT_ENCODING,
46 MIN_COMPRESSED_SIZE = 128
47 MAX_SIMPLE_RESPONSE_SIZE = 4194000
56 hass.http.register_view(hassio_ingress)
60 """Hass.io view to handle base part."""
62 name =
"api:hassio:ingress"
63 url =
"/api/hassio_ingress/{token}/{path:.*}"
66 def __init__(self, host: str, websession: aiohttp.ClientSession) ->
None:
67 """Initialize a Hass.io ingress view."""
74 """Create URL to service."""
75 base_path = f
"/ingress/{token}/"
78 target_url = self.
_url_url.join(
URL(f
"{base_path}{quote(path)}"))
79 except ValueError
as err:
80 raise HTTPBadRequest
from err
82 if not target_url.path.startswith(base_path):
88 self, request: web.Request, token: str, path: str
89 ) -> web.Response | web.StreamResponse | web.WebSocketResponse:
90 """Route data to Hass.io ingress service."""
99 except aiohttp.ClientError
as err:
100 _LOGGER.debug(
"Ingress error with %s / %s: %s", token, path, err)
102 raise HTTPBadGateway
from None
112 self, request: web.Request, token: str, path: str
113 ) -> web.WebSocketResponse:
114 """Ingress route for websocket."""
115 req_protocols: Iterable[str]
116 if hdrs.SEC_WEBSOCKET_PROTOCOL
in request.headers:
119 for proto
in request.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(
",")
124 ws_server = web.WebSocketResponse(
125 protocols=req_protocols, autoclose=
False, autoping=
False
127 await ws_server.prepare(request)
134 if request.query_string:
135 url = url.with_query(request.query_string)
140 headers=source_header,
141 protocols=req_protocols,
151 return_when=asyncio.FIRST_COMPLETED,
157 self, request: web.Request, token: str, path: str
158 ) -> web.Response | web.StreamResponse:
159 """Ingress route for request."""
166 headers=source_header,
167 params=request.query,
168 allow_redirects=
False,
169 data=request.content
if request.method !=
"GET" else None,
170 timeout=ClientTimeout(total=
None),
171 skip_auto_headers={hdrs.CONTENT_TYPE},
174 content_length_int = 0
175 content_length = result.headers.get(hdrs.CONTENT_LENGTH, UNDEFINED)
177 if maybe_content_type := result.headers.get(hdrs.CONTENT_TYPE):
178 content_type: str = (maybe_content_type.partition(
";"))[0].strip()
181 content_type =
"application/octet-stream"
184 if result.status
in (204, 304)
or (
185 content_length
is not UNDEFINED
186 and (content_length_int :=
int(content_length))
187 <= MAX_SIMPLE_RESPONSE_SIZE
190 body = await result.read()
191 simple_response = web.Response(
193 status=result.status,
194 content_type=content_type,
196 zlib_executor_size=32768,
201 simple_response.enable_compression()
202 return simple_response
205 response = web.StreamResponse(status=result.status, headers=headers)
206 response.content_type = content_type
210 response.enable_compression()
211 await response.prepare(request)
215 async
for data, _
in result.content.iter_chunks():
216 await response.write(data)
220 aiohttp.ClientPayloadError,
221 ConnectionResetError,
223 _LOGGER.debug(
"Stream error %s / %s: %s", token, path, err)
228 @lru_cache(maxsize=32)
230 """Create X-Forwarded-For header."""
231 connected_ip = ip_address(peer_name)
232 return f
"{forward_for}, {connected_ip!s}" if forward_for
else f
"{connected_ip!s}"
235 def _init_header(request: web.Request, token: str) -> CIMultiDict | dict[str, str]:
236 """Create initial header."""
239 for name, value
in request.headers.items()
240 if name
not in INIT_HEADERS_FILTER
243 headers[X_HASS_SOURCE] =
"core.ingress"
244 headers[X_INGRESS_PATH] = f
"/api/hassio_ingress/{token}"
247 forward_for = request.headers.get(hdrs.X_FORWARDED_FOR)
248 assert request.transport
249 if (peername := request.transport.get_extra_info(
"peername"))
is None:
250 _LOGGER.error(
"Can't set forward_for header, missing peername")
256 if not (forward_host := request.headers.get(hdrs.X_FORWARDED_HOST)):
257 forward_host = request.host
258 headers[hdrs.X_FORWARDED_HOST] = forward_host
261 forward_proto = request.headers.get(hdrs.X_FORWARDED_PROTO)
262 if not forward_proto:
263 forward_proto = request.scheme
264 headers[hdrs.X_FORWARDED_PROTO] = forward_proto
270 """Create response header."""
273 for name, value
in response.headers.items()
274 if name
not in RESPONSE_HEADERS_FILTER
279 """Return True if request is a websocket."""
280 headers = request.headers
282 "upgrade" in headers.get(hdrs.CONNECTION,
"").lower()
283 and headers.get(hdrs.UPGRADE,
"").lower() ==
"websocket"
288 ws_from: web.WebSocketResponse | ClientWebSocketResponse,
289 ws_to: web.WebSocketResponse | ClientWebSocketResponse,
291 """Handle websocket message directly."""
293 async
for msg
in ws_from:
294 if msg.type
is aiohttp.WSMsgType.TEXT:
295 await ws_to.send_str(msg.data)
296 elif msg.type
is aiohttp.WSMsgType.BINARY:
297 await ws_to.send_bytes(msg.data)
298 elif msg.type
is aiohttp.WSMsgType.PING:
300 elif msg.type
is aiohttp.WSMsgType.PONG:
303 await ws_to.close(code=ws_to.close_code, message=msg.extra)
305 _LOGGER.debug(
"Ingress Websocket runtime error")
306 except ConnectionResetError:
307 _LOGGER.debug(
"Ingress Websocket Connection Reset")
URL _create_url(self, str token, str path)
web.WebSocketResponse _handle_websocket(self, web.Request request, str token, str path)
web.Response|web.StreamResponse|web.WebSocketResponse _handle(self, web.Request request, str token, str path)
web.Response|web.StreamResponse _handle_request(self, web.Request request, str token, str path)
None __init__(self, str host, aiohttp.ClientSession websession)
bool should_compress(str content_type, str|None path=None)
None _websocket_forward(web.WebSocketResponse|ClientWebSocketResponse ws_from, web.WebSocketResponse|ClientWebSocketResponse ws_to)
CIMultiDict|dict[str, str] _init_header(web.Request request, str token)
dict[str, str] _response_header(aiohttp.ClientResponse response)
str _forwarded_for_header(str|None forward_for, str peer_name)
bool _is_websocket(web.Request request)
None async_setup_ingress_view(HomeAssistant hass, str host)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)