Home Assistant Unofficial Reference 2024.12.1
http.py
Go to the documentation of this file.
1 """View to accept incoming websocket connection."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import deque
7 from collections.abc import Callable, Coroutine
8 import datetime as dt
9 from functools import partial
10 import logging
11 from typing import TYPE_CHECKING, Any, Final
12 
13 from aiohttp import WSMsgType, web
14 from aiohttp.http_websocket import WebSocketWriter
15 
16 from homeassistant.components.http import KEY_HASS, HomeAssistantView
17 from homeassistant.const import EVENT_HOMEASSISTANT_STOP
18 from homeassistant.core import Event, HomeAssistant, callback
19 from homeassistant.helpers.dispatcher import async_dispatcher_send
20 from homeassistant.helpers.event import async_call_later
21 from homeassistant.util.async_ import create_eager_task
22 from homeassistant.util.json import json_loads
23 
24 from .auth import AUTH_REQUIRED_MESSAGE, AuthPhase
25 from .const import (
26  DATA_CONNECTIONS,
27  MAX_PENDING_MSG,
28  PENDING_MSG_MAX_FORCE_READY,
29  PENDING_MSG_PEAK,
30  PENDING_MSG_PEAK_TIME,
31  SIGNAL_WEBSOCKET_CONNECTED,
32  SIGNAL_WEBSOCKET_DISCONNECTED,
33  URL,
34 )
35 from .error import Disconnect
36 from .messages import message_to_json_bytes
37 from .util import describe_request
38 
39 CLOSE_MSG_TYPES = {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING}
40 
41 if TYPE_CHECKING:
42  from .connection import ActiveConnection
43 
44 
45 _WS_LOGGER: Final = logging.getLogger(f"{__name__}.connection")
46 
47 
48 class WebsocketAPIView(HomeAssistantView):
49  """View to serve a websockets endpoint."""
50 
51  name: str = "websocketapi"
52  url: str = URL
53  requires_auth: bool = False
54 
55  async def get(self, request: web.Request) -> web.WebSocketResponse:
56  """Handle an incoming websocket connection."""
57  return await WebSocketHandler(request.app[KEY_HASS], request).async_handle()
58 
59 
60 class WebSocketAdapter(logging.LoggerAdapter):
61  """Add connection id to websocket messages."""
62 
63  def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
64  """Add connid to websocket log messages."""
65  assert self.extra is not None
66  return f'[{self.extra["connid"]}] {msg}', kwargs
67 
68 
70  """Handle an active websocket client connection."""
71 
72  __slots__ = (
73  "_hass",
74  "_loop",
75  "_request",
76  "_wsock",
77  "_handle_task",
78  "_writer_task",
79  "_closing",
80  "_authenticated",
81  "_logger",
82  "_peak_checker_unsub",
83  "_connection",
84  "_message_queue",
85  "_ready_future",
86  "_release_ready_queue_size",
87  )
88 
89  def __init__(self, hass: HomeAssistant, request: web.Request) -> None:
90  """Initialize an active connection."""
91  self._hass_hass = hass
92  self._loop_loop = hass.loop
93  self._request_request: web.Request = request
94  self._wsock_wsock = web.WebSocketResponse(heartbeat=55)
95  self._handle_task_handle_task: asyncio.Task | None = None
96  self._writer_task_writer_task: asyncio.Task | None = None
97  self._closing_closing: bool = False
98  self._authenticated_authenticated: bool = False
99  self._logger_logger = WebSocketAdapter(_WS_LOGGER, {"connid": id(self)})
100  self._peak_checker_unsub_peak_checker_unsub: Callable[[], None] | None = None
101  self._connection_connection: ActiveConnection | None = None
102 
103  # The WebSocketHandler has a single consumer and path
104  # to where messages are queued. This allows the implementation
105  # to use a deque and an asyncio.Future to avoid the overhead of
106  # an asyncio.Queue.
107  self._message_queue_message_queue: deque[bytes] = deque()
108  self._ready_future_ready_future: asyncio.Future[int] | None = None
109  self._release_ready_queue_size_release_ready_queue_size: int = 0
110 
111  def __repr__(self) -> str:
112  """Return the representation."""
113  return (
114  "<WebSocketHandler "
115  f"closing={self._closing} "
116  f"authenticated={self._authenticated} "
117  f"description={self.description}>"
118  )
119 
120  @property
121  def description(self) -> str:
122  """Return a description of the connection."""
123  if connection := self._connection_connection:
124  return connection.get_description(self._request_request)
125  if request := self._request_request:
126  return describe_request(request)
127  return "finished connection"
128 
129  async def _writer(
130  self,
131  connection: ActiveConnection,
132  send_bytes_text: Callable[[bytes], Coroutine[Any, Any, None]],
133  ) -> None:
134  """Write outgoing messages."""
135  # Variables are set locally to avoid lookups in the loop
136  message_queue = self._message_queue_message_queue
137  logger = self._logger_logger
138  wsock = self._wsock_wsock
139  loop = self._loop_loop
140  is_debug_log_enabled = partial(logger.isEnabledFor, logging.DEBUG)
141  debug = logger.debug
142  can_coalesce = connection.can_coalesce
143  ready_message_count = len(message_queue)
144  # Exceptions if Socket disconnected or cancelled by connection handler
145  try:
146  while not wsock.closed:
147  if not message_queue:
148  self._ready_future_ready_future = loop.create_future()
149  ready_message_count = await self._ready_future_ready_future
150 
151  if self._closing_closing:
152  return
153 
154  if not can_coalesce:
155  # coalesce may be enabled later in the connection
156  can_coalesce = connection.can_coalesce
157 
158  if not can_coalesce or ready_message_count == 1:
159  message = message_queue.popleft()
160  if is_debug_log_enabled():
161  debug("%s: Sending %s", self.descriptiondescription, message)
162  await send_bytes_text(message)
163  continue
164 
165  coalesced_messages = b"".join((b"[", b",".join(message_queue), b"]"))
166  message_queue.clear()
167  if is_debug_log_enabled():
168  debug("%s: Sending %s", self.descriptiondescription, coalesced_messages)
169  await send_bytes_text(coalesced_messages)
170  except asyncio.CancelledError:
171  debug("%s: Writer cancelled", self.descriptiondescription)
172  raise
173  except (RuntimeError, ConnectionResetError) as ex:
174  debug("%s: Unexpected error in writer: %s", self.descriptiondescription, ex)
175  finally:
176  debug("%s: Writer done", self.descriptiondescription)
177  # Clean up the peak checker when we shut down the writer
178  self._cancel_peak_checker_cancel_peak_checker()
179 
180  @callback
181  def _cancel_peak_checker(self) -> None:
182  """Cancel the peak checker."""
183  if self._peak_checker_unsub_peak_checker_unsub is not None:
184  self._peak_checker_unsub_peak_checker_unsub()
185  self._peak_checker_unsub_peak_checker_unsub = None
186 
187  @callback
188  def _send_message(self, message: str | bytes | dict[str, Any]) -> None:
189  """Queue sending a message to the client.
190 
191  Closes connection if the client is not reading the messages.
192 
193  Async friendly.
194  """
195  if self._closing_closing:
196  # Connection is cancelled, don't flood logs about exceeding
197  # max pending messages.
198  return
199 
200  if type(message) is not bytes: # noqa: E721
201  if isinstance(message, dict):
202  message = message_to_json_bytes(message)
203  elif isinstance(message, str):
204  message = message.encode("utf-8")
205 
206  message_queue = self._message_queue_message_queue
207  message_queue.append(message)
208  if (queue_size_after_add := len(message_queue)) >= MAX_PENDING_MSG:
209  self._logger_logger.error(
210  (
211  "%s: Client unable to keep up with pending messages. Reached %s pending"
212  " messages. The system's load is too high or an integration is"
213  " misbehaving; Last message was: %s"
214  ),
215  self.descriptiondescription,
216  MAX_PENDING_MSG,
217  message,
218  )
219  self._cancel_cancel()
220  return
221 
222  if self._release_ready_queue_size_release_ready_queue_size == 0:
223  # Try to coalesce more messages to reduce the number of writes
224  self._release_ready_queue_size_release_ready_queue_size = queue_size_after_add
225  self._loop_loop.call_soon(self._release_ready_future_or_reschedule_release_ready_future_or_reschedule)
226 
227  peak_checker_active = self._peak_checker_unsub_peak_checker_unsub is not None
228 
229  if queue_size_after_add <= PENDING_MSG_PEAK:
230  if peak_checker_active:
231  self._cancel_peak_checker_cancel_peak_checker()
232  return
233 
234  if not peak_checker_active:
235  self._peak_checker_unsub_peak_checker_unsub = async_call_later(
236  self._hass_hass, PENDING_MSG_PEAK_TIME, self._check_write_peak_check_write_peak
237  )
238 
239  @callback
241  """Release the ready future or reschedule.
242 
243  We will release the ready future if the queue did not grow since the
244  last time we tried to release the ready future.
245 
246  If we reach PENDING_MSG_MAX_FORCE_READY, we will release the ready future
247  immediately so avoid the coalesced messages from growing too large.
248  """
249  if not (ready_future := self._ready_future_ready_future) or not (
250  queue_size := len(self._message_queue_message_queue)
251  ):
252  self._release_ready_queue_size_release_ready_queue_size = 0
253  return
254  # If we are below the max pending to force ready, and there are new messages
255  # in the queue since the last time we tried to release the ready future, we
256  # try again later so we can coalesce more messages.
257  if queue_size > self._release_ready_queue_size_release_ready_queue_size < PENDING_MSG_MAX_FORCE_READY:
258  self._release_ready_queue_size_release_ready_queue_size = queue_size
259  self._loop_loop.call_soon(self._release_ready_future_or_reschedule_release_ready_future_or_reschedule)
260  return
261  self._release_ready_queue_size_release_ready_queue_size = 0
262  if not ready_future.done():
263  ready_future.set_result(queue_size)
264 
265  @callback
266  def _check_write_peak(self, _utc_time: dt.datetime) -> None:
267  """Check that we are no longer above the write peak."""
268  self._peak_checker_unsub_peak_checker_unsub = None
269 
270  if len(self._message_queue_message_queue) < PENDING_MSG_PEAK:
271  return
272 
273  self._logger_logger.error(
274  (
275  "%s: Client unable to keep up with pending messages. Stayed over %s for %s"
276  " seconds. The system's load is too high or an integration is"
277  " misbehaving; Last message was: %s"
278  ),
279  self.descriptiondescription,
280  PENDING_MSG_PEAK,
281  PENDING_MSG_PEAK_TIME,
282  self._message_queue_message_queue[-1],
283  )
284  self._cancel_cancel()
285 
286  @callback
287  def _cancel(self) -> None:
288  """Cancel the connection."""
289  self._closing_closing = True
290  self._cancel_peak_checker_cancel_peak_checker()
291  if self._handle_task_handle_task is not None:
292  self._handle_task_handle_task.cancel()
293  if self._writer_task_writer_task is not None:
294  self._writer_task_writer_task.cancel()
295 
296  @callback
297  def _async_handle_hass_stop(self, event: Event) -> None:
298  """Cancel this connection."""
299  self._cancel_cancel()
300 
301  async def async_handle(self) -> web.WebSocketResponse:
302  """Handle a websocket response."""
303  request = self._request_request
304  wsock = self._wsock_wsock
305  logger = self._logger_logger
306  hass = self._hass_hass
307 
308  try:
309  async with asyncio.timeout(10):
310  await wsock.prepare(request)
311  except ConnectionResetError:
312  # Likely the client disconnected before we prepared the websocket
313  logger.debug(
314  "%s: Connection reset by peer while preparing WebSocket",
315  self.descriptiondescription,
316  )
317  return wsock
318  except TimeoutError:
319  logger.warning("Timeout preparing request from %s", request.remote)
320  return wsock
321 
322  logger.debug("%s: Connected from %s", self.descriptiondescription, request.remote)
323  self._handle_task_handle_task = asyncio.current_task()
324 
325  unsub_stop = hass.bus.async_listen(
326  EVENT_HOMEASSISTANT_STOP, self._async_handle_hass_stop_async_handle_hass_stop
327  )
328 
329  writer = wsock._writer # noqa: SLF001
330  if TYPE_CHECKING:
331  assert writer is not None
332 
333  send_bytes_text = partial(writer.send_frame, opcode=WSMsgType.TEXT)
334  auth = AuthPhase(
335  logger, hass, self._send_message_send_message, self._cancel_cancel, request, send_bytes_text
336  )
337  connection: ActiveConnection | None = None
338  disconnect_warn: str | None = None
339 
340  try:
341  connection = await self._async_handle_auth_phase_async_handle_auth_phase(auth, send_bytes_text)
342  self._async_increase_writer_limit_async_increase_writer_limit(writer)
343  await self._async_websocket_command_phase_async_websocket_command_phase(connection)
344  except asyncio.CancelledError:
345  logger.debug("%s: Connection cancelled", self.descriptiondescription)
346  raise
347  except Disconnect as ex:
348  if disconnect_msg := str(ex):
349  disconnect_warn = disconnect_msg
350 
351  logger.debug("%s: Connection closed by client: %s", self.descriptiondescription, ex)
352  except Exception:
353  logger.exception(
354  "%s: Unexpected error inside websocket API", self.descriptiondescription
355  )
356  finally:
357  unsub_stop()
358 
359  self._cancel_peak_checker_cancel_peak_checker()
360 
361  if connection is not None:
362  connection.async_handle_close()
363 
364  self._closing_closing = True
365  if self._ready_future_ready_future and not self._ready_future_ready_future.done():
366  self._ready_future_ready_future.set_result(len(self._message_queue_message_queue))
367 
368  await self._async_cleanup_writer_and_close_async_cleanup_writer_and_close(disconnect_warn, connection)
369 
370  return wsock
371 
373  self,
374  auth: AuthPhase,
375  send_bytes_text: Callable[[bytes], Coroutine[Any, Any, None]],
376  ) -> ActiveConnection:
377  """Handle the auth phase of the websocket connection."""
378  await send_bytes_text(AUTH_REQUIRED_MESSAGE)
379 
380  # Auth Phase
381  try:
382  msg = await self._wsock_wsock.receive(10)
383  except TimeoutError as err:
384  raise Disconnect("Did not receive auth message within 10 seconds") from err
385 
386  if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
387  raise Disconnect("Received close message during auth phase")
388 
389  if msg.type is not WSMsgType.TEXT:
390  raise Disconnect("Received non-Text message during auth phase")
391 
392  try:
393  auth_msg_data = json_loads(msg.data)
394  except ValueError as err:
395  raise Disconnect("Received invalid JSON during auth phase") from err
396 
397  if self._logger_logger.isEnabledFor(logging.DEBUG):
398  self._logger_logger.debug("%s: Received %s", self.descriptiondescription, auth_msg_data)
399  connection = await auth.async_handle(auth_msg_data)
400  # As the webserver is now started before the start
401  # event we do not want to block for websocket responses
402  #
403  # We only start the writer queue after the auth phase is completed
404  # since there is no need to queue messages before the auth phase
405  self._connection_connection = connection
406  self._writer_task_writer_task = create_eager_task(self._writer_writer(connection, send_bytes_text))
407  self._hass_hass.data[DATA_CONNECTIONS] = self._hass_hass.data.get(DATA_CONNECTIONS, 0) + 1
408  async_dispatcher_send(self._hass_hass, SIGNAL_WEBSOCKET_CONNECTED)
409 
410  self._authenticated_authenticated = True
411  return connection
412 
413  @callback
414  def _async_increase_writer_limit(self, writer: WebSocketWriter) -> None:
415  #
416  #
417  # Our websocket implementation is backed by a deque
418  #
419  # As back-pressure builds, the queue will back up and use more memory
420  # until we disconnect the client when the queue size reaches
421  # MAX_PENDING_MSG. When we are generating a high volume of websocket messages,
422  # we hit a bottleneck in aiohttp where it will wait for
423  # the buffer to drain before sending the next message and messages
424  # start backing up in the queue.
425  #
426  # https://github.com/aio-libs/aiohttp/issues/1367 added drains
427  # to the websocket writer to handle malicious clients and network issues.
428  # The drain causes multiple problems for us since the buffer cannot be
429  # drained fast enough when we deliver a high volume or large messages:
430  #
431  # - We end up disconnecting the client. The client will then reconnect,
432  # and the cycle repeats itself, which results in a significant amount of
433  # CPU usage.
434  #
435  # - Messages latency increases because messages cannot be moved into
436  # the TCP buffer because it is blocked waiting for the drain to happen because
437  # of the low default limit of 16KiB. By increasing the limit, we instead
438  # rely on the underlying TCP buffer and stack to deliver the messages which
439  # can typically happen much faster.
440  #
441  # After the auth phase is completed, and we are not concerned about
442  # the user being a malicious client, we set the limit to force a drain
443  # to 1MiB. 1MiB is the maximum expected size of the serialized entity
444  # registry, which is the largest message we usually send.
445  #
446  # https://github.com/aio-libs/aiohttp/commit/b3c80ee3f7d5d8f0b8bc27afe52e4d46621eaf99
447  # added a way to set the limit, but there is no way to actually
448  # reach the code to set the limit, so we have to set it directly.
449  #
450  writer._limit = 2**20 # noqa: SLF001
451 
453  self, connection: ActiveConnection
454  ) -> None:
455  """Handle the command phase of the websocket connection."""
456  wsock = self._wsock_wsock
457  async_handle_str = connection.async_handle
458  async_handle_binary = connection.async_handle_binary
459  _debug_enabled = partial(self._logger_logger.isEnabledFor, logging.DEBUG)
460 
461  # Command phase
462  while not wsock.closed:
463  msg = await wsock.receive()
464  msg_type = msg.type
465  msg_data = msg.data
466 
467  if msg_type in CLOSE_MSG_TYPES:
468  break
469 
470  if msg_type is WSMsgType.BINARY:
471  if len(msg_data) < 1:
472  raise Disconnect("Received invalid binary message.")
473 
474  handler = msg_data[0]
475  payload = msg_data[1:]
476  async_handle_binary(handler, payload)
477  continue
478 
479  if msg_type is not WSMsgType.TEXT:
480  raise Disconnect("Received non-Text message.")
481 
482  try:
483  command_msg_data = json_loads(msg_data)
484  except ValueError as ex:
485  raise Disconnect("Received invalid JSON.") from ex
486 
487  if _debug_enabled():
488  self._logger_logger.debug(
489  "%s: Received %s", self.descriptiondescription, command_msg_data
490  )
491 
492  # command_msg_data is always deserialized from JSON as a list
493  if type(command_msg_data) is not list: # noqa: E721
494  async_handle_str(command_msg_data)
495  continue
496 
497  for split_msg in command_msg_data:
498  async_handle_str(split_msg)
499 
501  self, disconnect_warn: str | None, connection: ActiveConnection | None
502  ) -> None:
503  """Cleanup the writer and close the websocket."""
504  # If the writer gets canceled we still need to close the websocket
505  # so we have another finally block to make sure we close the websocket
506  # if the writer gets canceled.
507  wsock = self._wsock_wsock
508  hass = self._hass_hass
509  logger = self._logger_logger
510  try:
511  if self._writer_task_writer_task:
512  await self._writer_task_writer_task
513  finally:
514  try:
515  # Make sure all error messages are written before closing
516  await wsock.close()
517  finally:
518  if disconnect_warn is None:
519  logger.debug("%s: Disconnected", self.descriptiondescription)
520  else:
521  logger.warning(
522  "%s: Disconnected: %s", self.descriptiondescription, disconnect_warn
523  )
524 
525  if connection is not None:
526  hass.data[DATA_CONNECTIONS] -= 1
527  self._connection_connection = None
528 
529  async_dispatcher_send(hass, SIGNAL_WEBSOCKET_DISCONNECTED)
530 
531  # Break reference cycles to make sure GC can happen sooner
532  self._wsock_wsock = None # type: ignore[assignment]
533  self._request_request = None # type: ignore[assignment]
534  self._hass_hass = None # type: ignore[assignment]
535  self._logger_logger = None # type: ignore[assignment]
536  self._message_queue_message_queue = None # type: ignore[assignment]
537  self._handle_task_handle_task = None
538  self._writer_task_writer_task = None
539  self._ready_future_ready_future = None
tuple[str, Any] process(self, str msg, Any kwargs)
Definition: http.py:63
None _check_write_peak(self, dt.datetime _utc_time)
Definition: http.py:266
None _send_message(self, str|bytes|dict[str, Any] message)
Definition: http.py:188
None _async_cleanup_writer_and_close(self, str|None disconnect_warn, ActiveConnection|None connection)
Definition: http.py:502
ActiveConnection _async_handle_auth_phase(self, AuthPhase auth, Callable[[bytes], Coroutine[Any, Any, None]] send_bytes_text)
Definition: http.py:376
None __init__(self, HomeAssistant hass, web.Request request)
Definition: http.py:89
None _async_increase_writer_limit(self, WebSocketWriter writer)
Definition: http.py:414
None _writer(self, ActiveConnection connection, Callable[[bytes], Coroutine[Any, Any, None]] send_bytes_text)
Definition: http.py:133
None _async_websocket_command_phase(self, ActiveConnection connection)
Definition: http.py:454
web.WebSocketResponse get(self, web.Request request)
Definition: http.py:55
bytes message_to_json_bytes(dict[str, Any] message)
Definition: messages.py:255
str describe_request(web.Request request)
Definition: util.py:8
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597
IntentResponse async_handle(HomeAssistant hass, str platform, str intent_type, _SlotsType|None slots=None, str|None text_input=None, Context|None context=None, str|None language=None, str|None assistant=None, str|None device_id=None, str|None conversation_agent_id=None)
Definition: intent.py:116