1 """View to accept incoming websocket connection."""
3 from __future__
import annotations
6 from collections
import deque
7 from collections.abc
import Callable, Coroutine
9 from functools
import partial
11 from typing
import TYPE_CHECKING, Any, Final
13 from aiohttp
import WSMsgType, web
14 from aiohttp.http_websocket
import WebSocketWriter
24 from .auth
import AUTH_REQUIRED_MESSAGE, AuthPhase
28 PENDING_MSG_MAX_FORCE_READY,
30 PENDING_MSG_PEAK_TIME,
31 SIGNAL_WEBSOCKET_CONNECTED,
32 SIGNAL_WEBSOCKET_DISCONNECTED,
35 from .error
import Disconnect
36 from .messages
import message_to_json_bytes
37 from .util
import describe_request
39 CLOSE_MSG_TYPES = {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING}
42 from .connection
import ActiveConnection
45 _WS_LOGGER: Final = logging.getLogger(f
"{__name__}.connection")
49 """View to serve a websockets endpoint."""
51 name: str =
"websocketapi"
53 requires_auth: bool =
False
55 async
def get(self, request: web.Request) -> web.WebSocketResponse:
56 """Handle an incoming websocket connection."""
61 """Add connection id to websocket messages."""
63 def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
64 """Add connid to websocket log messages."""
65 assert self.extra
is not None
66 return f
'[{self.extra["connid"]}] {msg}', kwargs
70 """Handle an active websocket client connection."""
82 "_peak_checker_unsub",
86 "_release_ready_queue_size",
89 def __init__(self, hass: HomeAssistant, request: web.Request) ->
None:
90 """Initialize an active connection."""
93 self.
_request_request: web.Request = request
94 self.
_wsock_wsock = web.WebSocketResponse(heartbeat=55)
95 self.
_handle_task_handle_task: asyncio.Task |
None =
None
96 self.
_writer_task_writer_task: asyncio.Task |
None =
None
101 self.
_connection_connection: ActiveConnection |
None =
None
108 self.
_ready_future_ready_future: asyncio.Future[int] |
None =
None
112 """Return the representation."""
115 f
"closing={self._closing} "
116 f
"authenticated={self._authenticated} "
117 f
"description={self.description}>"
122 """Return a description of the connection."""
124 return connection.get_description(self.
_request_request)
125 if request := self.
_request_request:
127 return "finished connection"
131 connection: ActiveConnection,
132 send_bytes_text: Callable[[bytes], Coroutine[Any, Any,
None]],
134 """Write outgoing messages."""
139 loop = self.
_loop_loop
140 is_debug_log_enabled = partial(logger.isEnabledFor, logging.DEBUG)
142 can_coalesce = connection.can_coalesce
143 ready_message_count = len(message_queue)
146 while not wsock.closed:
147 if not message_queue:
156 can_coalesce = connection.can_coalesce
158 if not can_coalesce
or ready_message_count == 1:
159 message = message_queue.popleft()
160 if is_debug_log_enabled():
161 debug(
"%s: Sending %s", self.
descriptiondescription, message)
162 await send_bytes_text(message)
165 coalesced_messages = b
"".join((b
"[", b
",".join(message_queue), b
"]"))
166 message_queue.clear()
167 if is_debug_log_enabled():
168 debug(
"%s: Sending %s", self.
descriptiondescription, coalesced_messages)
169 await send_bytes_text(coalesced_messages)
170 except asyncio.CancelledError:
171 debug(
"%s: Writer cancelled", self.
descriptiondescription)
173 except (RuntimeError, ConnectionResetError)
as ex:
174 debug(
"%s: Unexpected error in writer: %s", self.
descriptiondescription, ex)
176 debug(
"%s: Writer done", self.
descriptiondescription)
182 """Cancel the peak checker."""
189 """Queue sending a message to the client.
191 Closes connection if the client is not reading the messages.
200 if type(message)
is not bytes:
201 if isinstance(message, dict):
203 elif isinstance(message, str):
204 message = message.encode(
"utf-8")
207 message_queue.append(message)
208 if (queue_size_after_add := len(message_queue)) >= MAX_PENDING_MSG:
211 "%s: Client unable to keep up with pending messages. Reached %s pending"
212 " messages. The system's load is too high or an integration is"
213 " misbehaving; Last message was: %s"
229 if queue_size_after_add <= PENDING_MSG_PEAK:
230 if peak_checker_active:
234 if not peak_checker_active:
241 """Release the ready future or reschedule.
243 We will release the ready future if the queue did not grow since the
244 last time we tried to release the ready future.
246 If we reach PENDING_MSG_MAX_FORCE_READY, we will release the ready future
247 immediately so avoid the coalesced messages from growing too large.
249 if not (ready_future := self.
_ready_future_ready_future)
or not (
262 if not ready_future.done():
263 ready_future.set_result(queue_size)
267 """Check that we are no longer above the write peak."""
275 "%s: Client unable to keep up with pending messages. Stayed over %s for %s"
276 " seconds. The system's load is too high or an integration is"
277 " misbehaving; Last message was: %s"
281 PENDING_MSG_PEAK_TIME,
288 """Cancel the connection."""
298 """Cancel this connection."""
302 """Handle a websocket response."""
306 hass = self.
_hass_hass
309 async
with asyncio.timeout(10):
310 await wsock.prepare(request)
311 except ConnectionResetError:
314 "%s: Connection reset by peer while preparing WebSocket",
319 logger.warning(
"Timeout preparing request from %s", request.remote)
322 logger.debug(
"%s: Connected from %s", self.
descriptiondescription, request.remote)
325 unsub_stop = hass.bus.async_listen(
329 writer = wsock._writer
331 assert writer
is not None
333 send_bytes_text = partial(writer.send_frame, opcode=WSMsgType.TEXT)
337 connection: ActiveConnection |
None =
None
338 disconnect_warn: str |
None =
None
344 except asyncio.CancelledError:
345 logger.debug(
"%s: Connection cancelled", self.
descriptiondescription)
347 except Disconnect
as ex:
348 if disconnect_msg :=
str(ex):
349 disconnect_warn = disconnect_msg
351 logger.debug(
"%s: Connection closed by client: %s", self.
descriptiondescription, ex)
354 "%s: Unexpected error inside websocket API", self.
descriptiondescription
361 if connection
is not None:
362 connection.async_handle_close()
375 send_bytes_text: Callable[[bytes], Coroutine[Any, Any,
None]],
376 ) -> ActiveConnection:
377 """Handle the auth phase of the websocket connection."""
378 await send_bytes_text(AUTH_REQUIRED_MESSAGE)
382 msg = await self.
_wsock_wsock.receive(10)
383 except TimeoutError
as err:
384 raise Disconnect(
"Did not receive auth message within 10 seconds")
from err
386 if msg.type
in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
387 raise Disconnect(
"Received close message during auth phase")
389 if msg.type
is not WSMsgType.TEXT:
390 raise Disconnect(
"Received non-Text message during auth phase")
394 except ValueError
as err:
395 raise Disconnect(
"Received invalid JSON during auth phase")
from err
397 if self.
_logger_logger.isEnabledFor(logging.DEBUG):
398 self.
_logger_logger.debug(
"%s: Received %s", self.
descriptiondescription, auth_msg_data)
399 connection = await auth.async_handle(auth_msg_data)
407 self.
_hass_hass.data[DATA_CONNECTIONS] = self.
_hass_hass.data.get(DATA_CONNECTIONS, 0) + 1
450 writer._limit = 2**20
453 self, connection: ActiveConnection
455 """Handle the command phase of the websocket connection."""
457 async_handle_str = connection.async_handle
458 async_handle_binary = connection.async_handle_binary
459 _debug_enabled = partial(self.
_logger_logger.isEnabledFor, logging.DEBUG)
462 while not wsock.closed:
463 msg = await wsock.receive()
467 if msg_type
in CLOSE_MSG_TYPES:
470 if msg_type
is WSMsgType.BINARY:
471 if len(msg_data) < 1:
472 raise Disconnect(
"Received invalid binary message.")
474 handler = msg_data[0]
475 payload = msg_data[1:]
476 async_handle_binary(handler, payload)
479 if msg_type
is not WSMsgType.TEXT:
480 raise Disconnect(
"Received non-Text message.")
484 except ValueError
as ex:
485 raise Disconnect(
"Received invalid JSON.")
from ex
489 "%s: Received %s", self.
descriptiondescription, command_msg_data
493 if type(command_msg_data)
is not list:
494 async_handle_str(command_msg_data)
497 for split_msg
in command_msg_data:
498 async_handle_str(split_msg)
501 self, disconnect_warn: str |
None, connection: ActiveConnection |
None
503 """Cleanup the writer and close the websocket."""
508 hass = self.
_hass_hass
518 if disconnect_warn
is None:
519 logger.debug(
"%s: Disconnected", self.
descriptiondescription)
522 "%s: Disconnected: %s", self.
descriptiondescription, disconnect_warn
525 if connection
is not None:
526 hass.data[DATA_CONNECTIONS] -= 1
534 self.
_hass_hass =
None
tuple[str, Any] process(self, str msg, Any kwargs)
None _check_write_peak(self, dt.datetime _utc_time)
None _send_message(self, str|bytes|dict[str, Any] message)
None _release_ready_future_or_reschedule(self)
None _async_cleanup_writer_and_close(self, str|None disconnect_warn, ActiveConnection|None connection)
_release_ready_queue_size
ActiveConnection _async_handle_auth_phase(self, AuthPhase auth, Callable[[bytes], Coroutine[Any, Any, None]] send_bytes_text)
None __init__(self, HomeAssistant hass, web.Request request)
None _cancel_peak_checker(self)
None _async_increase_writer_limit(self, WebSocketWriter writer)
None _writer(self, ActiveConnection connection, Callable[[bytes], Coroutine[Any, Any, None]] send_bytes_text)
web.WebSocketResponse async_handle(self)
None _async_handle_hass_stop(self, Event event)
None _async_websocket_command_phase(self, ActiveConnection connection)
web.WebSocketResponse get(self, web.Request request)
bytes message_to_json_bytes(dict[str, Any] message)
str describe_request(web.Request request)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
IntentResponse async_handle(HomeAssistant hass, str platform, str intent_type, _SlotsType|None slots=None, str|None text_input=None, Context|None context=None, str|None language=None, str|None assistant=None, str|None device_id=None, str|None conversation_agent_id=None)