1 """Provide functionality to stream video source.
3 Components use create_stream with a stream source (e.g. an rtsp url) to create
4 a new Stream object. Stream manages:
5 - Background work to fetch and decode a stream
6 - Desired output formats
7 - Home Assistant URLs for viewing a stream
8 - Access tokens for URLs for viewing a stream
10 A Stream consists of a background worker, and one or more output formats each
11 with their own idle timeout managed by the stream component. When an output
12 format is no longer in use, the stream component will expire it. When there
13 are no active output formats, the background worker is shut down and access
14 tokens are expired. Alternatively, a Stream can be configured with keepalive
15 to always keep workers active.
18 from __future__
import annotations
21 from collections.abc
import Callable, Mapping
27 from types
import MappingProxyType
28 from typing
import TYPE_CHECKING, Any, Final, cast
30 import voluptuous
as vol
45 CONF_EXTRA_PART_WAIT_TIME,
49 CONF_SEGMENT_DURATION,
50 CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
59 SEGMENT_DURATION_ADJUSTER,
61 STREAM_RESTART_INCREMENT,
62 STREAM_RESTART_RESET_TIME,
66 STREAM_SETTINGS_NON_LL_HLS,
73 from .diagnostics
import Diagnostics
74 from .hls
import HlsStreamOutput, async_setup_hls
81 "CONF_EXTRA_PART_WAIT_TIME",
82 "CONF_RTSP_TRANSPORT",
83 "CONF_USE_WALLCLOCK_AS_TIMESTAMPS",
85 "FORMAT_CONTENT_TYPE",
95 _LOGGER = logging.getLogger(__name__)
99 """Redact credentials from string data."""
101 if yurl.user
is not None:
102 yurl = yurl.with_user(
"****")
103 if yurl.password
is not None:
104 yurl = yurl.with_password(
"****")
105 redacted_query_params = dict.fromkeys(
106 {
"auth",
"user",
"password"} & yurl.query.keys(),
"****"
108 return str(yurl.update_query(redacted_query_params))
114 options: Mapping[str, str | bool | float],
115 dynamic_stream_settings: DynamicStreamSettings,
116 stream_label: str |
None =
None,
118 """Create a stream with the specified identifier based on the source url.
120 The stream_source is typically an rtsp url (though any url accepted by ffmpeg is fine) and
121 options (see STREAM_OPTIONS_SCHEMA) are converted and passed into pyav / ffmpeg.
123 The stream_label is a string used as an additional message in logging.
126 def convert_stream_options(
127 hass: HomeAssistant, stream_options: Mapping[str, str | bool | float]
128 ) -> tuple[dict[str, str], StreamSettings]:
129 """Convert options from stream options into PyAV options and stream settings."""
130 stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
131 pyav_options: dict[str, str] = {}
133 STREAM_OPTIONS_SCHEMA(stream_options)
134 except vol.Invalid
as exc:
137 if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
138 stream_settings.hls_part_timeout += extra_wait_time
139 if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
140 assert isinstance(rtsp_transport, str)
143 pyav_options[
"rtsp_transport"] = rtsp_transport
144 if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
145 pyav_options[
"use_wallclock_as_timestamps"] =
"1"
147 return pyav_options, stream_settings
149 if DOMAIN
not in hass.config.components:
153 pyav_options, stream_settings = convert_stream_options(hass, options)
155 if isinstance(stream_source, str)
and stream_source[:7] ==
"rtsp://":
157 "rtsp_flags":
"prefer_tcp",
158 "stimeout":
"5000000",
165 pyav_options=pyav_options,
166 stream_settings=stream_settings,
167 dynamic_stream_settings=dynamic_stream_settings,
168 stream_label=stream_label,
170 hass.data[DOMAIN][ATTR_STREAMS].append(stream)
174 DOMAIN_SCHEMA = vol.Schema(
176 vol.Optional(CONF_LL_HLS, default=
True): cv.boolean,
177 vol.Optional(CONF_SEGMENT_DURATION, default=6): vol.All(
178 cv.positive_float, vol.Range(min=2, max=10)
180 vol.Optional(CONF_PART_DURATION, default=1): vol.All(
181 cv.positive_float, vol.Range(min=0.2, max=1.5)
186 CONFIG_SCHEMA = vol.Schema(
188 DOMAIN: DOMAIN_SCHEMA,
190 extra=vol.ALLOW_EXTRA,
195 """Turn PyAV logging on or off."""
198 av.logging.set_level(av.logging.VERBOSE
if enable
else av.logging.FATAL)
201 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
203 debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
206 def update_pyav_logging(_event: Event |
None =
None) ->
None:
207 """Adjust libav logging to only log when the stream logger is at DEBUG."""
208 nonlocal debug_enabled
209 if (new_debug_enabled := _LOGGER.isEnabledFor(logging.DEBUG)) == debug_enabled:
211 debug_enabled = new_debug_enabled
216 cancel_logging_listener = hass.bus.async_listen(
217 EVENT_LOGGING_CHANGED, update_pyav_logging
221 for logging_namespace
in (
"libav.mp4",
"libav.swscaler"):
222 logging.getLogger(logging_namespace).setLevel(logging.ERROR)
226 await hass.async_add_executor_job(set_pyav_logging, debug_enabled)
230 from .recorder
import async_setup_recorder
232 hass.data[DOMAIN] = {}
233 hass.data[DOMAIN][ATTR_ENDPOINTS] = {}
234 hass.data[DOMAIN][ATTR_STREAMS] = []
236 if conf[CONF_LL_HLS]:
237 assert isinstance(conf[CONF_SEGMENT_DURATION], float)
238 assert isinstance(conf[CONF_PART_DURATION], float)
241 min_segment_duration=conf[CONF_SEGMENT_DURATION]
242 - SEGMENT_DURATION_ADJUSTER,
243 part_target_duration=conf[CONF_PART_DURATION],
244 hls_advance_part_limit=
max(
int(3 / conf[CONF_PART_DURATION]), 3),
245 hls_part_timeout=2 * conf[CONF_PART_DURATION],
248 hass.data[DOMAIN][ATTR_SETTINGS] = STREAM_SETTINGS_NON_LL_HLS
252 hass.data[DOMAIN][ATTR_ENDPOINTS][HLS_PROVIDER] = hls_endpoint
257 async
def shutdown(event: Event) ->
None:
258 """Stop all stream workers."""
259 for stream
in hass.data[DOMAIN][ATTR_STREAMS]:
260 stream.dynamic_stream_settings.preload_stream =
False
262 create_eager_task(stream.stop())
263 for stream
in hass.data[DOMAIN][ATTR_STREAMS]
265 await asyncio.wait(awaitables)
266 _LOGGER.debug(
"Stopped stream workers")
267 cancel_logging_listener()
269 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
275 """Represents a single stream."""
281 pyav_options: dict[str, str],
282 stream_settings: StreamSettings,
283 dynamic_stream_settings: DynamicStreamSettings,
284 stream_label: str |
None =
None,
286 """Initialize a stream."""
295 self.
_thread_thread: threading.Thread |
None =
None
297 self.
_outputs_outputs: dict[str, StreamOutput] = {}
300 hass, stream_settings, dynamic_stream_settings
305 logging.getLogger(f
"{__package__}.stream.{stream_label}")
312 """Start the stream and returns a url for the output format."""
313 if fmt
not in self.
_outputs_outputs:
314 raise ValueError(f
"Stream is not configured for format '{fmt}'")
317 endpoint_fmt: str = self.
hasshass.data[DOMAIN][ATTR_ENDPOINTS][fmt]
318 return endpoint_fmt.format(self.
access_tokenaccess_token)
320 def outputs(self) -> Mapping[str, StreamOutput]:
321 """Return a copy of the stream outputs."""
324 return MappingProxyType(self.
_outputs_outputs.copy())
327 self, fmt: str, timeout: int = OUTPUT_IDLE_TIMEOUT
329 """Add provider output stream."""
330 if not (provider := self.
_outputs_outputs.
get(fmt)):
332 async
def idle_callback() -> None:
335 or fmt == RECORDER_PROVIDER
340 provider = PROVIDERS[fmt](
346 self.
_outputs_outputs[fmt] = provider
351 """Remove provider output stream."""
352 if provider.name
in self.
_outputs_outputs:
353 self.
_outputs_outputs[provider.name].cleanup()
354 del self.
_outputs_outputs[provider.name]
357 await self.
stopstop()
360 """Reset access token if all providers are idle."""
361 if all(p.idle
for p
in self.
_outputs_outputs.values()):
366 """Return False if the stream is started and known to be unavailable."""
370 """Set callback to run when state changes."""
375 """Set state and Run callback to notify state has been updated."""
383 Uses an asyncio.Lock to avoid conflicts with _stop().
388 if self.
_thread_thread
is not None:
391 self.
_thread_thread.join(timeout=0)
394 name=
"stream_worker",
403 """Restart the stream with a new stream source."""
404 self.
_diagnostics_diagnostics.increment(
"update_source")
408 self.
sourcesource = new_source
413 """Set the stream state by updating the callback."""
420 """Handle consuming streams and restart keepalive streams."""
423 from .worker
import StreamState, StreamWorkerError, stream_worker
427 while not self.
_thread_quit_thread_quit.wait(timeout=wait_timeout):
428 start_time = time.time()
446 except StreamWorkerError
as err:
448 self.
_logger_logger.error(
"Error from stream worker: %s",
str(err))
450 stream_state.discontinuity()
466 if time.time() - start_time > STREAM_RESTART_RESET_TIME:
468 wait_timeout += STREAM_RESTART_INCREMENT
469 self.
_diagnostics_diagnostics.set_value(
"retry_timeout", wait_timeout)
471 "Restarting stream worker in %d seconds: %s",
476 async
def worker_finished() -> None:
484 for provider
in self.
outputsoutputs().values():
487 self.
hasshass.create_task(worker_finished())
490 """Remove outputs and access token."""
495 await self.
_stop_stop()
498 """Stop worker thread.
500 Uses an asyncio.Lock to avoid conflicts with start().
503 if self.
_thread_thread
is None:
506 await self.
hasshass.async_add_executor_job(self.
_thread_thread.join)
513 self, video_path: str, duration: int = 30, lookback: int = 5
515 """Make a .mp4 recording from a provided stream."""
519 from .recorder
import RecorderOutput
522 if not self.
hasshass.config.is_allowed_path(video_path):
526 if recorder := self.
outputsoutputs().
get(RECORDER_PROVIDER):
527 assert isinstance(recorder, RecorderOutput)
529 f
"Stream already recording to {recorder.video_path}!"
532 RecorderOutput, self.
add_provideradd_provider(RECORDER_PROVIDER, timeout=duration)
534 recorder.video_path = video_path
536 await self.
startstart()
538 self.
_logger_logger.debug(
"Started a stream recording of %s seconds", duration)
541 hls: HlsStreamOutput = cast(HlsStreamOutput, self.
outputsoutputs().
get(HLS_PROVIDER))
543 num_segments =
min(
int(lookback / hls.target_duration) + 1, MAX_SEGMENTS)
546 recorder.prepend(
list(hls.get_segments())[-num_segments - 1 : -1])
548 await recorder.async_record()
552 width: int |
None =
None,
553 height: int |
None =
None,
554 wait_for_next_keyframe: bool =
False,
556 """Fetch an image from the Stream and return it as a jpeg in bytes.
558 Calls async_get_image from KeyFrameConverter. async_get_image should only be
559 called directly from the main loop and not from an executor thread as it uses
560 hass.add_executor_job underneath the hood.
564 await self.
startstart()
568 wait_for_next_keyframe=wait_for_next_keyframe,
572 """Return diagnostics information for the stream."""
577 """Return true if worker failures should be retried, for disabling during tests."""
581 STREAM_OPTIONS_SCHEMA: Final = vol.Schema(
583 vol.Optional(CONF_RTSP_TRANSPORT): vol.In(RTSP_TRANSPORTS),
584 vol.Optional(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): bool,
585 vol.Optional(CONF_EXTRA_PART_WAIT_TIME): cv.positive_float,
None set_update_callback(self, Callable[[], None] update_callback)
bytes|None async_get_image(self, int|None width=None, int|None height=None, bool wait_for_next_keyframe=False)
None async_record(self, str video_path, int duration=30, int lookback=5)
None _set_state(self, bool available)
None __init__(self, HomeAssistant hass, str source, dict[str, str] pyav_options, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings, str|None stream_label=None)
StreamOutput add_provider(self, str fmt, int timeout=OUTPUT_IDLE_TIMEOUT)
Mapping[str, StreamOutput] outputs(self)
str endpoint_url(self, str fmt)
dict[str, Any] get_diagnostics(self)
None update_source(self, str new_source)
None _async_update_state(self, bool available)
None remove_provider(self, StreamOutput provider)
web.Response get(self, web.Request request, str config_key)
str async_setup_hls(HomeAssistant hass)
None async_setup_recorder(HomeAssistant hass)
None stream_worker(str source, dict[str, str] pyav_options, StreamSettings stream_settings, StreamState stream_state, KeyFrameConverter keyframe_converter, Event quit_event)
Stream create_stream(HomeAssistant hass, str stream_source, Mapping[str, str|bool|float] options, DynamicStreamSettings dynamic_stream_settings, str|None stream_label=None)
bool async_setup(HomeAssistant hass, ConfigType config)
None set_pyav_logging(bool enable)
str redact_credentials(str url)
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)