Home Assistant Unofficial Reference 2024.12.1
webrtc.py
Go to the documentation of this file.
1 """Helper for WebRTC support."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 import asyncio
7 from collections.abc import Awaitable, Callable, Iterable
8 from dataclasses import asdict, dataclass, field
9 from functools import cache, partial, wraps
10 import logging
11 from typing import TYPE_CHECKING, Any, Protocol
12 
13 from mashumaro import MissingField
14 import voluptuous as vol
15 from webrtc_models import (
16  RTCConfiguration,
17  RTCIceCandidate,
18  RTCIceCandidateInit,
19  RTCIceServer,
20 )
21 
22 from homeassistant.components import websocket_api
23 from homeassistant.core import HomeAssistant, callback
24 from homeassistant.exceptions import HomeAssistantError
25 from homeassistant.helpers import config_validation as cv, issue_registry as ir
26 from homeassistant.helpers.deprecation import deprecated_function
27 from homeassistant.util.hass_dict import HassKey
28 from homeassistant.util.ulid import ulid
29 
30 from .const import DATA_COMPONENT, DOMAIN, StreamType
31 from .helper import get_camera_from_entity_id
32 
33 if TYPE_CHECKING:
34  from . import Camera
35 
36 _LOGGER = logging.getLogger(__name__)
37 
38 
39 DATA_WEBRTC_PROVIDERS: HassKey[set[CameraWebRTCProvider]] = HassKey(
40  "camera_webrtc_providers"
41 )
42 DATA_WEBRTC_LEGACY_PROVIDERS: HassKey[dict[str, CameraWebRTCLegacyProvider]] = HassKey(
43  "camera_webrtc_legacy_providers"
44 )
45 DATA_ICE_SERVERS: HassKey[list[Callable[[], Iterable[RTCIceServer]]]] = HassKey(
46  "camera_webrtc_ice_servers"
47 )
48 
49 
50 _WEBRTC = "WebRTC"
51 
52 
53 @dataclass(frozen=True)
55  """Base class for WebRTC messages."""
56 
57  @classmethod
58  @cache
59  def _get_type(cls) -> str:
60  _, _, name = cls.__name__.partition(_WEBRTC)
61  return name.lower()
62 
63  def as_dict(self) -> dict[str, Any]:
64  """Return a dict representation of the message."""
65  data = asdict(self)
66  data["type"] = self._get_type_get_type()
67  return data
68 
69 
70 @dataclass(frozen=True)
72  """WebRTC session."""
73 
74  session_id: str
75 
76 
77 @dataclass(frozen=True)
79  """WebRTC answer."""
80 
81  answer: str
82 
83 
84 @dataclass(frozen=True)
86  """WebRTC candidate."""
87 
88  candidate: RTCIceCandidate | RTCIceCandidateInit
89 
90  def as_dict(self) -> dict[str, Any]:
91  """Return a dict representation of the message."""
92  return {
93  "type": self._get_type_get_type(),
94  "candidate": self.candidate.to_dict(),
95  }
96 
97 
98 @dataclass(frozen=True)
100  """WebRTC error."""
101 
102  code: str
103  message: str
104 
105 
106 type WebRTCSendMessage = Callable[[WebRTCMessage], None]
107 
108 
109 @dataclass(kw_only=True)
111  """WebRTC configuration for the client.
112 
113  Not part of the spec, but required to configure client.
114  """
115 
116  configuration: RTCConfiguration = field(default_factory=RTCConfiguration)
117  data_channel: str | None = None
118  get_candidates_upfront: bool = False
119 
120  def to_frontend_dict(self) -> dict[str, Any]:
121  """Return a dict that can be used by the frontend."""
122  data: dict[str, Any] = {
123  "configuration": self.configuration.to_dict(),
124  "getCandidatesUpfront": self.get_candidates_upfront,
125  }
126  if self.data_channel is not None:
127  data["dataChannel"] = self.data_channel
128  return data
129 
130 
132  """WebRTC provider."""
133 
134  @property
135  @abstractmethod
136  def domain(self) -> str:
137  """Return the integration domain of the provider."""
138 
139  @callback
140  @abstractmethod
141  def async_is_supported(self, stream_source: str) -> bool:
142  """Determine if the provider supports the stream source."""
143 
144  @abstractmethod
146  self,
147  camera: Camera,
148  offer_sdp: str,
149  session_id: str,
150  send_message: WebRTCSendMessage,
151  ) -> None:
152  """Handle the WebRTC offer and return the answer via the provided callback."""
153 
154  @abstractmethod
156  self, session_id: str, candidate: RTCIceCandidateInit
157  ) -> None:
158  """Handle the WebRTC candidate."""
159 
160  @callback
161  def async_close_session(self, session_id: str) -> None:
162  """Close the session."""
163  return ## This is an optional method so we need a default here.
164 
165 
167  """WebRTC provider."""
168 
169  async def async_is_supported(self, stream_source: str) -> bool:
170  """Determine if the provider supports the stream source."""
171 
173  self, camera: Camera, offer_sdp: str
174  ) -> str | None:
175  """Handle the WebRTC offer and return an answer."""
176 
177 
178 @callback
180  hass: HomeAssistant,
181  provider: CameraWebRTCProvider,
182 ) -> Callable[[], None]:
183  """Register a WebRTC provider.
184 
185  The first provider to satisfy the offer will be used.
186  """
187  if DOMAIN not in hass.data:
188  raise ValueError("Unexpected state, camera not loaded")
189 
190  providers = hass.data.setdefault(DATA_WEBRTC_PROVIDERS, set())
191 
192  @callback
193  def remove_provider() -> None:
194  providers.remove(provider)
195  hass.async_create_task(_async_refresh_providers(hass))
196 
197  if provider in providers:
198  raise ValueError("Provider already registered")
199 
200  providers.add(provider)
201  hass.async_create_task(_async_refresh_providers(hass))
202  return remove_provider
203 
204 
205 async def _async_refresh_providers(hass: HomeAssistant) -> None:
206  """Check all cameras for any state changes for registered providers."""
208 
209  component = hass.data[DATA_COMPONENT]
210  await asyncio.gather(
211  *(camera.async_refresh_providers() for camera in component.entities)
212  )
213 
214 
215 type WsCommandWithCamera = Callable[
216  [websocket_api.ActiveConnection, dict[str, Any], Camera],
217  Awaitable[None],
218 ]
219 
220 
222  error_code: str,
223 ) -> Callable[[WsCommandWithCamera], websocket_api.AsyncWebSocketCommandHandler]:
224  """Validate that the camera supports WebRTC."""
225 
226  def decorate(
227  func: WsCommandWithCamera,
228  ) -> websocket_api.AsyncWebSocketCommandHandler:
229  """Decorate func."""
230 
231  @wraps(func)
232  async def validate(
233  hass: HomeAssistant,
234  connection: websocket_api.ActiveConnection,
235  msg: dict[str, Any],
236  ) -> None:
237  """Validate that the camera supports WebRTC."""
238  entity_id = msg["entity_id"]
239  camera = get_camera_from_entity_id(hass, entity_id)
240  if StreamType.WEB_RTC not in (
241  stream_types := camera.camera_capabilities.frontend_stream_types
242  ):
243  connection.send_error(
244  msg["id"],
245  error_code,
246  (
247  "Camera does not support WebRTC,"
248  f" frontend_stream_types={stream_types}"
249  ),
250  )
251  return
252 
253  await func(connection, msg, camera)
254 
255  return validate
256 
257  return decorate
258 
259 
260 @websocket_api.websocket_command( { vol.Required("type"): "camera/webrtc/offer",
261  vol.Required("entity_id"): cv.entity_id,
262  vol.Required("offer"): str,
263  }
264 )
265 @websocket_api.async_response
266 @require_webrtc_support("webrtc_offer_failed")
267 async def ws_webrtc_offer(
268  connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
269 ) -> None:
270  """Handle the signal path for a WebRTC stream.
271 
272  This signal path is used to route the offer created by the client to the
273  camera device through the integration for negotiation on initial setup.
274  The ws endpoint returns a subscription id, where ice candidates and the
275  final answer will be returned.
276  The actual streaming is handled entirely between the client and camera device.
277 
278  Async friendly.
279  """
280  offer = msg["offer"]
281  session_id = ulid()
282  connection.subscriptions[msg["id"]] = partial(
283  camera.close_webrtc_session, session_id
284  )
285 
286  connection.send_message(websocket_api.result_message(msg["id"]))
287 
288  @callback
289  def send_message(message: WebRTCMessage) -> None:
290  """Push a value to websocket."""
291  connection.send_message(
292  websocket_api.event_message(
293  msg["id"],
294  message.as_dict(),
295  )
296  )
297 
298  send_message(WebRTCSession(session_id))
299 
300  try:
301  await camera.async_handle_async_webrtc_offer(offer, session_id, send_message)
302  except HomeAssistantError as ex:
303  _LOGGER.error("Error handling WebRTC offer: %s", ex)
304  send_message(
305  WebRTCError(
306  "webrtc_offer_failed",
307  str(ex),
308  )
309  )
310 
311 
312 @websocket_api.websocket_command( { vol.Required("type"): "camera/webrtc/get_client_config",
313  vol.Required("entity_id"): cv.entity_id,
314  }
315 )
316 @websocket_api.async_response
317 @require_webrtc_support("webrtc_get_client_config_failed")
318 async def ws_get_client_config(
319  connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
320 ) -> None:
321  """Handle get WebRTC client config websocket command."""
322  config = camera.async_get_webrtc_client_configuration().to_frontend_dict()
323  connection.send_result(
324  msg["id"],
325  config,
326  )
327 
328 
329 def _parse_webrtc_candidate_init(value: Any) -> RTCIceCandidateInit:
330  """Validate and parse a WebRTCCandidateInit dict."""
331  try:
332  return RTCIceCandidateInit.from_dict(value)
333  except (MissingField, ValueError) as ex:
334  raise vol.Invalid(str(ex)) from ex
335 
336 
337 @websocket_api.websocket_command( { vol.Required("type"): "camera/webrtc/candidate",
338  vol.Required("entity_id"): cv.entity_id,
339  vol.Required("session_id"): str,
340  vol.Required("candidate"): _parse_webrtc_candidate_init,
341  }
342 )
343 @websocket_api.async_response
344 @require_webrtc_support("webrtc_candidate_failed")
345 async def ws_candidate(
346  connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
347 ) -> None:
348  """Handle WebRTC candidate websocket command."""
349  await camera.async_on_webrtc_candidate(msg["session_id"], msg["candidate"])
350  connection.send_message(websocket_api.result_message(msg["id"]))
351 
352 
353 @callback
354 def async_register_ws(hass: HomeAssistant) -> None:
355  """Register camera webrtc ws endpoints."""
356 
357  websocket_api.async_register_command(hass, ws_webrtc_offer)
358  websocket_api.async_register_command(hass, ws_get_client_config)
359  websocket_api.async_register_command(hass, ws_candidate)
360 
361 
363  hass: HomeAssistant, camera: Camera
364 ) -> CameraWebRTCProvider | None:
365  """Return the first supported provider for the camera."""
366  providers = hass.data.get(DATA_WEBRTC_PROVIDERS)
367  if not providers or not (stream_source := await camera.stream_source()):
368  return None
369 
370  for provider in providers:
371  if provider.async_is_supported(stream_source):
372  return provider
373 
374  return None
375 
376 
378  hass: HomeAssistant, camera: Camera
379 ) -> CameraWebRTCLegacyProvider | None:
380  """Return the first supported provider for the camera."""
381  providers = hass.data.get(DATA_WEBRTC_LEGACY_PROVIDERS)
382  if not providers or not (stream_source := await camera.stream_source()):
383  return None
384 
385  for provider in providers.values():
386  if await provider.async_is_supported(stream_source):
387  return provider
388 
389  return None
390 
391 
392 @callback
394  hass: HomeAssistant,
395  get_ice_server_fn: Callable[[], Iterable[RTCIceServer]],
396 ) -> Callable[[], None]:
397  """Register a ICE server.
398 
399  The registering integration is responsible to implement caching if needed.
400  """
401  servers = hass.data.setdefault(DATA_ICE_SERVERS, [])
402 
403  def remove() -> None:
404  servers.remove(get_ice_server_fn)
405 
406  servers.append(get_ice_server_fn)
407  return remove
408 
409 
410 # The following code is legacy code that was introduced with rtsp_to_webrtc and will be deprecated/removed in the future.
411 # Left it so custom integrations can still use it.
412 
413 _RTSP_PREFIXES = {"rtsp://", "rtsps://", "rtmp://"}
414 
415 # An RtspToWebRtcProvider accepts these inputs:
416 # stream_source: The RTSP url
417 # offer_sdp: The WebRTC SDP offer
418 # stream_id: A unique id for the stream, used to update an existing source
419 # The output is the SDP answer, or None if the source or offer is not eligible.
420 # The Callable may throw HomeAssistantError on failure.
421 type RtspToWebRtcProviderType = Callable[[str, str, str], Awaitable[str | None]]
422 
423 
425  def __init__(self, fn: RtspToWebRtcProviderType) -> None:
426  """Initialize the RTSP to WebRTC provider."""
427  self._fn = fn
428 
429  async def async_is_supported(self, stream_source: str) -> bool:
430  """Return if this provider is supports the Camera as source."""
431  return any(stream_source.startswith(prefix) for prefix in _RTSP_PREFIXES)
432 
434  self, camera: Camera, offer_sdp: str
435  ) -> str | None:
436  """Handle the WebRTC offer and return an answer."""
437  if not (stream_source := await camera.stream_source()):
438  return None
439 
440  return await self._fn_fn(stream_source, offer_sdp, camera.entity_id)
441 
442 
443 @deprecated_function("async_register_webrtc_provider", breaks_in_ha_version="2025.6")
445  hass: HomeAssistant,
446  domain: str,
447  provider: RtspToWebRtcProviderType,
448 ) -> Callable[[], None]:
449  """Register an RTSP to WebRTC provider.
450 
451  The first provider to satisfy the offer will be used.
452  """
453  if DOMAIN not in hass.data:
454  raise ValueError("Unexpected state, camera not loaded")
455 
456  legacy_providers = hass.data.setdefault(DATA_WEBRTC_LEGACY_PROVIDERS, {})
457 
458  if domain in legacy_providers:
459  raise ValueError("Provider already registered")
460 
461  provider_instance = _CameraRtspToWebRTCProvider(provider)
462 
463  @callback
464  def remove_provider() -> None:
465  legacy_providers.pop(domain)
466  hass.async_create_task(_async_refresh_providers(hass))
467 
468  legacy_providers[domain] = provider_instance
469  hass.async_create_task(_async_refresh_providers(hass))
470 
471  return remove_provider
472 
473 
474 @callback
475 def _async_check_conflicting_legacy_provider(hass: HomeAssistant) -> None:
476  """Check if a legacy provider is registered together with the builtin provider."""
477  builtin_provider_domain = "go2rtc"
478  if (
479  (legacy_providers := hass.data.get(DATA_WEBRTC_LEGACY_PROVIDERS))
480  and (providers := hass.data.get(DATA_WEBRTC_PROVIDERS))
481  and any(provider.domain == builtin_provider_domain for provider in providers)
482  ):
483  for domain in legacy_providers:
484  ir.async_create_issue(
485  hass,
486  DOMAIN,
487  f"legacy_webrtc_provider_{domain}",
488  is_fixable=False,
489  is_persistent=False,
490  issue_domain=domain,
491  learn_more_url="https://www.home-assistant.io/integrations/go2rtc/",
492  severity=ir.IssueSeverity.WARNING,
493  translation_key="legacy_webrtc_provider",
494  translation_placeholders={
495  "legacy_integration": domain,
496  "builtin_integration": builtin_provider_domain,
497  },
498  )
499 
str|None async_handle_web_rtc_offer(self, Camera camera, str offer_sdp)
Definition: webrtc.py:174
None async_on_webrtc_candidate(self, str session_id, RTCIceCandidateInit candidate)
Definition: webrtc.py:157
None async_handle_async_webrtc_offer(self, Camera camera, str offer_sdp, str session_id, WebRTCSendMessage send_message)
Definition: webrtc.py:151
str|None async_handle_web_rtc_offer(self, Camera camera, str offer_sdp)
Definition: webrtc.py:441
None __init__(self, _AOSmithCoordinatorT coordinator, str junction_id)
Definition: entity.py:20
bool remove(self, _T matcher)
Definition: match.py:214
Camera get_camera_from_entity_id(HomeAssistant hass, str entity_id)
Definition: helper.py:16
Callable[[], None] async_register_rtsp_to_web_rtc_provider(HomeAssistant hass, str domain, RtspToWebRtcProviderType provider)
Definition: webrtc.py:454
Callable[[], None] async_register_webrtc_provider(HomeAssistant hass, CameraWebRTCProvider provider)
Definition: webrtc.py:182
None ws_webrtc_offer(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
Definition: webrtc.py:271
None ws_get_client_config(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
Definition: webrtc.py:324
Callable[[], None] async_register_ice_servers(HomeAssistant hass, Callable[[], Iterable[RTCIceServer]] get_ice_server_fn)
Definition: webrtc.py:402
Callable[[WsCommandWithCamera], websocket_api.AsyncWebSocketCommandHandler] require_webrtc_support(str error_code)
Definition: webrtc.py:223
CameraWebRTCProvider|None async_get_supported_provider(HomeAssistant hass, Camera camera)
Definition: webrtc.py:370
None _async_refresh_providers(HomeAssistant hass)
Definition: webrtc.py:205
None _async_check_conflicting_legacy_provider(HomeAssistant hass)
Definition: webrtc.py:481
None async_register_ws(HomeAssistant hass)
Definition: webrtc.py:360
CameraWebRTCLegacyProvider|None async_get_supported_legacy_provider(HomeAssistant hass, Camera camera)
Definition: webrtc.py:385
RTCIceCandidateInit _parse_webrtc_candidate_init(Any value)
Definition: webrtc.py:333
None ws_candidate(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
Definition: webrtc.py:353
dict[str, Any] validate(SchemaCommonFlowHandler handler, dict[str, Any] user_input)
Definition: config_flow.py:27
str ulid(float|None timestamp=None)
Definition: ulid.py:27