1 """Helper for WebRTC support."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
7 from collections.abc
import Awaitable, Callable, Iterable
8 from dataclasses
import asdict, dataclass, field
9 from functools
import cache, partial, wraps
11 from typing
import TYPE_CHECKING, Any, Protocol
13 from mashumaro
import MissingField
14 import voluptuous
as vol
15 from webrtc_models
import (
30 from .const
import DATA_COMPONENT, DOMAIN, StreamType
31 from .helper
import get_camera_from_entity_id
36 _LOGGER = logging.getLogger(__name__)
39 DATA_WEBRTC_PROVIDERS: HassKey[set[CameraWebRTCProvider]] =
HassKey(
40 "camera_webrtc_providers"
42 DATA_WEBRTC_LEGACY_PROVIDERS: HassKey[dict[str, CameraWebRTCLegacyProvider]] =
HassKey(
43 "camera_webrtc_legacy_providers"
45 DATA_ICE_SERVERS: HassKey[list[Callable[[], Iterable[RTCIceServer]]]] =
HassKey(
46 "camera_webrtc_ice_servers"
53 @dataclass(frozen=True)
55 """Base class for WebRTC messages."""
60 _, _, name = cls.__name__.partition(_WEBRTC)
64 """Return a dict representation of the message."""
70 @dataclass(frozen=True)
77 @dataclass(frozen=True)
84 @dataclass(frozen=True)
86 """WebRTC candidate."""
88 candidate: RTCIceCandidate | RTCIceCandidateInit
91 """Return a dict representation of the message."""
94 "candidate": self.candidate.to_dict(),
98 @dataclass(frozen=True)
106 type WebRTCSendMessage = Callable[[WebRTCMessage],
None]
109 @dataclass(kw_only=True)
111 """WebRTC configuration for the client.
113 Not part of the spec, but required to configure client.
116 configuration: RTCConfiguration = field(default_factory=RTCConfiguration)
117 data_channel: str |
None =
None
118 get_candidates_upfront: bool =
False
121 """Return a dict that can be used by the frontend."""
122 data: dict[str, Any] = {
123 "configuration": self.configuration.to_dict(),
124 "getCandidatesUpfront": self.get_candidates_upfront,
126 if self.data_channel
is not None:
127 data[
"dataChannel"] = self.data_channel
132 """WebRTC provider."""
137 """Return the integration domain of the provider."""
142 """Determine if the provider supports the stream source."""
150 send_message: WebRTCSendMessage,
152 """Handle the WebRTC offer and return the answer via the provided callback."""
156 self, session_id: str, candidate: RTCIceCandidateInit
158 """Handle the WebRTC candidate."""
162 """Close the session."""
167 """WebRTC provider."""
170 """Determine if the provider supports the stream source."""
173 self, camera: Camera, offer_sdp: str
175 """Handle the WebRTC offer and return an answer."""
181 provider: CameraWebRTCProvider,
182 ) -> Callable[[],
None]:
183 """Register a WebRTC provider.
185 The first provider to satisfy the offer will be used.
187 if DOMAIN
not in hass.data:
188 raise ValueError(
"Unexpected state, camera not loaded")
190 providers = hass.data.setdefault(DATA_WEBRTC_PROVIDERS, set())
193 def remove_provider() -> None:
194 providers.remove(provider)
197 if provider
in providers:
198 raise ValueError(
"Provider already registered")
200 providers.add(provider)
202 return remove_provider
206 """Check all cameras for any state changes for registered providers."""
209 component = hass.data[DATA_COMPONENT]
210 await asyncio.gather(
211 *(camera.async_refresh_providers()
for camera
in component.entities)
215 type WsCommandWithCamera = Callable[
223 ) -> Callable[[WsCommandWithCamera], websocket_api.AsyncWebSocketCommandHandler]:
224 """Validate that the camera supports WebRTC."""
227 func: WsCommandWithCamera,
228 ) -> websocket_api.AsyncWebSocketCommandHandler:
237 """Validate that the camera supports WebRTC."""
238 entity_id = msg[
"entity_id"]
240 if StreamType.WEB_RTC
not in (
241 stream_types := camera.camera_capabilities.frontend_stream_types
243 connection.send_error(
247 "Camera does not support WebRTC,"
248 f
" frontend_stream_types={stream_types}"
253 await func(connection, msg, camera)
260 @websocket_api.websocket_command(
{
vol.Required("type"):
"camera/webrtc/offer",
261 vol.Required(
"entity_id"): cv.entity_id,
262 vol.Required(
"offer"): str,
265 @websocket_api.async_response
266 @require_webrtc_support("webrtc_offer_failed")
268 connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
270 """Handle the signal path for a WebRTC stream.
272 This signal path is used to route the offer created by the client to the
273 camera device through the integration for negotiation on initial setup.
274 The ws endpoint returns a subscription id, where ice candidates and the
275 final answer will be returned.
276 The actual streaming is handled entirely between the client and camera device.
282 connection.subscriptions[msg[
"id"]] = partial(
283 camera.close_webrtc_session, session_id
286 connection.send_message(websocket_api.result_message(msg[
"id"]))
289 def send_message(message: WebRTCMessage) ->
None:
290 """Push a value to websocket."""
291 connection.send_message(
292 websocket_api.event_message(
301 await camera.async_handle_async_webrtc_offer(offer, session_id, send_message)
302 except HomeAssistantError
as ex:
303 _LOGGER.error(
"Error handling WebRTC offer: %s", ex)
306 "webrtc_offer_failed",
312 @websocket_api.websocket_command(
{
vol.Required("type"):
"camera/webrtc/get_client_config",
313 vol.Required(
"entity_id"): cv.entity_id,
316 @websocket_api.async_response
317 @require_webrtc_support("webrtc_get_client_config_failed")
319 connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
321 """Handle get WebRTC client config websocket command."""
322 config = camera.async_get_webrtc_client_configuration().to_frontend_dict()
323 connection.send_result(
330 """Validate and parse a WebRTCCandidateInit dict."""
332 return RTCIceCandidateInit.from_dict(value)
333 except (MissingField, ValueError)
as ex:
334 raise vol.Invalid(
str(ex))
from ex
337 @websocket_api.websocket_command(
{
vol.Required("type"):
"camera/webrtc/candidate",
338 vol.Required(
"entity_id"): cv.entity_id,
339 vol.Required(
"session_id"): str,
340 vol.Required(
"candidate"): _parse_webrtc_candidate_init,
343 @websocket_api.async_response
344 @require_webrtc_support("webrtc_candidate_failed")
346 connection: websocket_api.ActiveConnection, msg: dict[str, Any], camera: Camera
348 """Handle WebRTC candidate websocket command."""
349 await camera.async_on_webrtc_candidate(msg[
"session_id"], msg[
"candidate"])
350 connection.send_message(websocket_api.result_message(msg[
"id"]))
355 """Register camera webrtc ws endpoints."""
357 websocket_api.async_register_command(hass, ws_webrtc_offer)
358 websocket_api.async_register_command(hass, ws_get_client_config)
359 websocket_api.async_register_command(hass, ws_candidate)
363 hass: HomeAssistant, camera: Camera
364 ) -> CameraWebRTCProvider |
None:
365 """Return the first supported provider for the camera."""
366 providers = hass.data.get(DATA_WEBRTC_PROVIDERS)
367 if not providers
or not (stream_source := await camera.stream_source()):
370 for provider
in providers:
371 if provider.async_is_supported(stream_source):
378 hass: HomeAssistant, camera: Camera
379 ) -> CameraWebRTCLegacyProvider |
None:
380 """Return the first supported provider for the camera."""
381 providers = hass.data.get(DATA_WEBRTC_LEGACY_PROVIDERS)
382 if not providers
or not (stream_source := await camera.stream_source()):
385 for provider
in providers.values():
386 if await provider.async_is_supported(stream_source):
395 get_ice_server_fn: Callable[[], Iterable[RTCIceServer]],
396 ) -> Callable[[],
None]:
397 """Register a ICE server.
399 The registering integration is responsible to implement caching if needed.
401 servers = hass.data.setdefault(DATA_ICE_SERVERS, [])
404 servers.remove(get_ice_server_fn)
406 servers.append(get_ice_server_fn)
413 _RTSP_PREFIXES = {
"rtsp://",
"rtsps://",
"rtmp://"}
421 type RtspToWebRtcProviderType = Callable[[str, str, str], Awaitable[str |
None]]
425 def __init__(self, fn: RtspToWebRtcProviderType) ->
None:
426 """Initialize the RTSP to WebRTC provider."""
429 async
def async_is_supported(self, stream_source: str) -> bool:
430 """Return if this provider is supports the Camera as source."""
431 return any(stream_source.startswith(prefix)
for prefix
in _RTSP_PREFIXES)
434 self, camera: Camera, offer_sdp: str
436 """Handle the WebRTC offer and return an answer."""
437 if not (stream_source := await camera.stream_source()):
440 return await self.
_fn_fn(stream_source, offer_sdp, camera.entity_id)
443 @deprecated_function("async_register_webrtc_provider", breaks_in_ha_version="2025.6")
447 provider: RtspToWebRtcProviderType,
448 ) -> Callable[[],
None]:
449 """Register an RTSP to WebRTC provider.
451 The first provider to satisfy the offer will be used.
453 if DOMAIN
not in hass.data:
454 raise ValueError(
"Unexpected state, camera not loaded")
456 legacy_providers = hass.data.setdefault(DATA_WEBRTC_LEGACY_PROVIDERS, {})
458 if domain
in legacy_providers:
459 raise ValueError(
"Provider already registered")
464 def remove_provider() -> None:
465 legacy_providers.pop(domain)
468 legacy_providers[domain] = provider_instance
471 return remove_provider
476 """Check if a legacy provider is registered together with the builtin provider."""
477 builtin_provider_domain =
"go2rtc"
479 (legacy_providers := hass.data.get(DATA_WEBRTC_LEGACY_PROVIDERS))
480 and (providers := hass.data.get(DATA_WEBRTC_PROVIDERS))
481 and any(provider.domain == builtin_provider_domain
for provider
in providers)
483 for domain
in legacy_providers:
484 ir.async_create_issue(
487 f
"legacy_webrtc_provider_{domain}",
491 learn_more_url=
"https://www.home-assistant.io/integrations/go2rtc/",
492 severity=ir.IssueSeverity.WARNING,
493 translation_key=
"legacy_webrtc_provider",
494 translation_placeholders={
495 "legacy_integration": domain,
496 "builtin_integration": builtin_provider_domain,
499
bool async_is_supported(self, str stream_source)
str|None async_handle_web_rtc_offer(self, Camera camera, str offer_sdp)
None async_on_webrtc_candidate(self, str session_id, RTCIceCandidateInit candidate)
None async_close_session(self, str session_id)
None async_handle_async_webrtc_offer(self, Camera camera, str offer_sdp, str session_id, WebRTCSendMessage send_message)
bool async_is_supported(self, str stream_source)
dict[str, Any] as_dict(self)
dict[str, Any] to_frontend_dict(self)
dict[str, Any] as_dict(self)
str|None async_handle_web_rtc_offer(self, Camera camera, str offer_sdp)
None __init__(self, _AOSmithCoordinatorT coordinator, str junction_id)
bool remove(self, _T matcher)
Camera get_camera_from_entity_id(HomeAssistant hass, str entity_id)
Callable[[], None] async_register_rtsp_to_web_rtc_provider(HomeAssistant hass, str domain, RtspToWebRtcProviderType provider)
Callable[[], None] async_register_webrtc_provider(HomeAssistant hass, CameraWebRTCProvider provider)
None ws_webrtc_offer(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
None ws_get_client_config(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
Callable[[], None] async_register_ice_servers(HomeAssistant hass, Callable[[], Iterable[RTCIceServer]] get_ice_server_fn)
Callable[[WsCommandWithCamera], websocket_api.AsyncWebSocketCommandHandler] require_webrtc_support(str error_code)
CameraWebRTCProvider|None async_get_supported_provider(HomeAssistant hass, Camera camera)
None _async_refresh_providers(HomeAssistant hass)
None _async_check_conflicting_legacy_provider(HomeAssistant hass)
None async_register_ws(HomeAssistant hass)
CameraWebRTCLegacyProvider|None async_get_supported_legacy_provider(HomeAssistant hass, Camera camera)
RTCIceCandidateInit _parse_webrtc_candidate_init(Any value)
None ws_candidate(websocket_api.ActiveConnection connection, dict[str, Any] msg, Camera camera)
dict[str, Any] validate(SchemaCommonFlowHandler handler, dict[str, Any] user_input)
str ulid(float|None timestamp=None)