Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Provide functionality to stream video source.
2 
3 Components use create_stream with a stream source (e.g. an rtsp url) to create
4 a new Stream object. Stream manages:
5  - Background work to fetch and decode a stream
6  - Desired output formats
7  - Home Assistant URLs for viewing a stream
8  - Access tokens for URLs for viewing a stream
9 
10 A Stream consists of a background worker, and one or more output formats each
11 with their own idle timeout managed by the stream component. When an output
12 format is no longer in use, the stream component will expire it. When there
13 are no active output formats, the background worker is shut down and access
14 tokens are expired. Alternatively, a Stream can be configured with keepalive
15 to always keep workers active.
16 """
17 
18 from __future__ import annotations
19 
20 import asyncio
21 from collections.abc import Callable, Mapping
22 import copy
23 import logging
24 import secrets
25 import threading
26 import time
27 from types import MappingProxyType
28 from typing import TYPE_CHECKING, Any, Final, cast
29 
30 import voluptuous as vol
31 from yarl import URL
32 
33 from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_LOGGING_CHANGED
34 from homeassistant.core import Event, HomeAssistant, callback
35 from homeassistant.exceptions import HomeAssistantError
37 from homeassistant.helpers.typing import ConfigType
38 from homeassistant.setup import SetupPhases, async_pause_setup
39 from homeassistant.util.async_ import create_eager_task
40 
41 from .const import (
42  ATTR_ENDPOINTS,
43  ATTR_SETTINGS,
44  ATTR_STREAMS,
45  CONF_EXTRA_PART_WAIT_TIME,
46  CONF_LL_HLS,
47  CONF_PART_DURATION,
48  CONF_RTSP_TRANSPORT,
49  CONF_SEGMENT_DURATION,
50  CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
51  DOMAIN,
52  FORMAT_CONTENT_TYPE,
53  HLS_PROVIDER,
54  MAX_SEGMENTS,
55  OUTPUT_FORMATS,
56  OUTPUT_IDLE_TIMEOUT,
57  RECORDER_PROVIDER,
58  RTSP_TRANSPORTS,
59  SEGMENT_DURATION_ADJUSTER,
60  SOURCE_TIMEOUT,
61  STREAM_RESTART_INCREMENT,
62  STREAM_RESTART_RESET_TIME,
63 )
64 from .core import (
65  PROVIDERS,
66  STREAM_SETTINGS_NON_LL_HLS,
67  IdleTimer,
68  KeyFrameConverter,
69  Orientation,
70  StreamOutput,
71  StreamSettings,
72 )
73 from .diagnostics import Diagnostics
74 from .hls import HlsStreamOutput, async_setup_hls
75 
76 if TYPE_CHECKING:
77  from homeassistant.components.camera import DynamicStreamSettings
78 
79 __all__ = [
80  "ATTR_SETTINGS",
81  "CONF_EXTRA_PART_WAIT_TIME",
82  "CONF_RTSP_TRANSPORT",
83  "CONF_USE_WALLCLOCK_AS_TIMESTAMPS",
84  "DOMAIN",
85  "FORMAT_CONTENT_TYPE",
86  "HLS_PROVIDER",
87  "OUTPUT_FORMATS",
88  "RTSP_TRANSPORTS",
89  "SOURCE_TIMEOUT",
90  "Stream",
91  "create_stream",
92  "Orientation",
93 ]
94 
95 _LOGGER = logging.getLogger(__name__)
96 
97 
98 def redact_credentials(url: str) -> str:
99  """Redact credentials from string data."""
100  yurl = URL(url)
101  if yurl.user is not None:
102  yurl = yurl.with_user("****")
103  if yurl.password is not None:
104  yurl = yurl.with_password("****")
105  redacted_query_params = dict.fromkeys(
106  {"auth", "user", "password"} & yurl.query.keys(), "****"
107  )
108  return str(yurl.update_query(redacted_query_params))
109 
110 
112  hass: HomeAssistant,
113  stream_source: str,
114  options: Mapping[str, str | bool | float],
115  dynamic_stream_settings: DynamicStreamSettings,
116  stream_label: str | None = None,
117 ) -> Stream:
118  """Create a stream with the specified identifier based on the source url.
119 
120  The stream_source is typically an rtsp url (though any url accepted by ffmpeg is fine) and
121  options (see STREAM_OPTIONS_SCHEMA) are converted and passed into pyav / ffmpeg.
122 
123  The stream_label is a string used as an additional message in logging.
124  """
125 
126  def convert_stream_options(
127  hass: HomeAssistant, stream_options: Mapping[str, str | bool | float]
128  ) -> tuple[dict[str, str], StreamSettings]:
129  """Convert options from stream options into PyAV options and stream settings."""
130  stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
131  pyav_options: dict[str, str] = {}
132  try:
133  STREAM_OPTIONS_SCHEMA(stream_options)
134  except vol.Invalid as exc:
135  raise HomeAssistantError("Invalid stream options") from exc
136 
137  if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
138  stream_settings.hls_part_timeout += extra_wait_time
139  if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
140  assert isinstance(rtsp_transport, str)
141  # The PyAV options currently match the stream CONF constants, but this
142  # will not necessarily always be the case, so they are hard coded here
143  pyav_options["rtsp_transport"] = rtsp_transport
144  if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
145  pyav_options["use_wallclock_as_timestamps"] = "1"
146 
147  return pyav_options, stream_settings
148 
149  if DOMAIN not in hass.config.components:
150  raise HomeAssistantError("Stream integration is not set up.")
151 
152  # Convert extra stream options into PyAV options and stream settings
153  pyav_options, stream_settings = convert_stream_options(hass, options)
154  # For RTSP streams, prefer TCP
155  if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
156  pyav_options = {
157  "rtsp_flags": "prefer_tcp",
158  "stimeout": "5000000",
159  **pyav_options,
160  }
161 
162  stream = Stream(
163  hass,
164  stream_source,
165  pyav_options=pyav_options,
166  stream_settings=stream_settings,
167  dynamic_stream_settings=dynamic_stream_settings,
168  stream_label=stream_label,
169  )
170  hass.data[DOMAIN][ATTR_STREAMS].append(stream)
171  return stream
172 
173 
174 DOMAIN_SCHEMA = vol.Schema(
175  {
176  vol.Optional(CONF_LL_HLS, default=True): cv.boolean,
177  vol.Optional(CONF_SEGMENT_DURATION, default=6): vol.All(
178  cv.positive_float, vol.Range(min=2, max=10)
179  ),
180  vol.Optional(CONF_PART_DURATION, default=1): vol.All(
181  cv.positive_float, vol.Range(min=0.2, max=1.5)
182  ),
183  }
184 )
185 
186 CONFIG_SCHEMA = vol.Schema(
187  {
188  DOMAIN: DOMAIN_SCHEMA,
189  },
190  extra=vol.ALLOW_EXTRA,
191 )
192 
193 
194 def set_pyav_logging(enable: bool) -> None:
195  """Turn PyAV logging on or off."""
196  import av # pylint: disable=import-outside-toplevel
197 
198  av.logging.set_level(av.logging.VERBOSE if enable else av.logging.FATAL)
199 
200 
201 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
202  """Set up stream."""
203  debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
204 
205  @callback
206  def update_pyav_logging(_event: Event | None = None) -> None:
207  """Adjust libav logging to only log when the stream logger is at DEBUG."""
208  nonlocal debug_enabled
209  if (new_debug_enabled := _LOGGER.isEnabledFor(logging.DEBUG)) == debug_enabled:
210  return
211  debug_enabled = new_debug_enabled
212  # enable PyAV logging iff Stream logger is set to debug
213  set_pyav_logging(new_debug_enabled)
214 
215  # Only pass through PyAV log messages if stream logging is above DEBUG
216  cancel_logging_listener = hass.bus.async_listen(
217  EVENT_LOGGING_CHANGED, update_pyav_logging
218  )
219  # libav.mp4 and libav.swscaler have a few unimportant messages that are logged
220  # at logging.WARNING. Set those Logger levels to logging.ERROR
221  for logging_namespace in ("libav.mp4", "libav.swscaler"):
222  logging.getLogger(logging_namespace).setLevel(logging.ERROR)
223 
224  # This will load av so we run it in the executor
225  with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
226  await hass.async_add_executor_job(set_pyav_logging, debug_enabled)
227 
228  # Keep import here so that we can import stream integration without installing reqs
229  # pylint: disable-next=import-outside-toplevel
230  from .recorder import async_setup_recorder
231 
232  hass.data[DOMAIN] = {}
233  hass.data[DOMAIN][ATTR_ENDPOINTS] = {}
234  hass.data[DOMAIN][ATTR_STREAMS] = []
235  conf = DOMAIN_SCHEMA(config.get(DOMAIN, {}))
236  if conf[CONF_LL_HLS]:
237  assert isinstance(conf[CONF_SEGMENT_DURATION], float)
238  assert isinstance(conf[CONF_PART_DURATION], float)
239  hass.data[DOMAIN][ATTR_SETTINGS] = StreamSettings(
240  ll_hls=True,
241  min_segment_duration=conf[CONF_SEGMENT_DURATION]
242  - SEGMENT_DURATION_ADJUSTER,
243  part_target_duration=conf[CONF_PART_DURATION],
244  hls_advance_part_limit=max(int(3 / conf[CONF_PART_DURATION]), 3),
245  hls_part_timeout=2 * conf[CONF_PART_DURATION],
246  )
247  else:
248  hass.data[DOMAIN][ATTR_SETTINGS] = STREAM_SETTINGS_NON_LL_HLS
249 
250  # Setup HLS
251  hls_endpoint = async_setup_hls(hass)
252  hass.data[DOMAIN][ATTR_ENDPOINTS][HLS_PROVIDER] = hls_endpoint
253 
254  # Setup Recorder
256 
257  async def shutdown(event: Event) -> None:
258  """Stop all stream workers."""
259  for stream in hass.data[DOMAIN][ATTR_STREAMS]:
260  stream.dynamic_stream_settings.preload_stream = False
261  if awaitables := [
262  create_eager_task(stream.stop())
263  for stream in hass.data[DOMAIN][ATTR_STREAMS]
264  ]:
265  await asyncio.wait(awaitables)
266  _LOGGER.debug("Stopped stream workers")
267  cancel_logging_listener()
268 
269  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
270 
271  return True
272 
273 
274 class Stream:
275  """Represents a single stream."""
276 
277  def __init__(
278  self,
279  hass: HomeAssistant,
280  source: str,
281  pyav_options: dict[str, str],
282  stream_settings: StreamSettings,
283  dynamic_stream_settings: DynamicStreamSettings,
284  stream_label: str | None = None,
285  ) -> None:
286  """Initialize a stream."""
287  self.hasshass = hass
288  self.sourcesource = source
289  self.pyav_optionspyav_options = pyav_options
290  self._stream_settings_stream_settings = stream_settings
291  self._stream_label_stream_label = stream_label
292  self.dynamic_stream_settingsdynamic_stream_settings = dynamic_stream_settings
293  self.access_tokenaccess_token: str | None = None
294  self._start_stop_lock_start_stop_lock = asyncio.Lock()
295  self._thread_thread: threading.Thread | None = None
296  self._thread_quit_thread_quit = threading.Event()
297  self._outputs_outputs: dict[str, StreamOutput] = {}
298  self._fast_restart_once_fast_restart_once = False
299  self._keyframe_converter_keyframe_converter = KeyFrameConverter(
300  hass, stream_settings, dynamic_stream_settings
301  )
302  self._available_available: bool = True
303  self._update_callback_update_callback: Callable[[], None] | None = None
304  self._logger_logger = (
305  logging.getLogger(f"{__package__}.stream.{stream_label}")
306  if stream_label
307  else _LOGGER
308  )
309  self._diagnostics_diagnostics = Diagnostics()
310 
311  def endpoint_url(self, fmt: str) -> str:
312  """Start the stream and returns a url for the output format."""
313  if fmt not in self._outputs_outputs:
314  raise ValueError(f"Stream is not configured for format '{fmt}'")
315  if not self.access_tokenaccess_token:
316  self.access_tokenaccess_token = secrets.token_hex()
317  endpoint_fmt: str = self.hasshass.data[DOMAIN][ATTR_ENDPOINTS][fmt]
318  return endpoint_fmt.format(self.access_tokenaccess_token)
319 
320  def outputs(self) -> Mapping[str, StreamOutput]:
321  """Return a copy of the stream outputs."""
322  # A copy is returned so the caller can iterate through the outputs
323  # without concern about self._outputs being modified from another thread.
324  return MappingProxyType(self._outputs_outputs.copy())
325 
327  self, fmt: str, timeout: int = OUTPUT_IDLE_TIMEOUT
328  ) -> StreamOutput:
329  """Add provider output stream."""
330  if not (provider := self._outputs_outputs.get(fmt)):
331 
332  async def idle_callback() -> None:
333  if (
334  not self.dynamic_stream_settingsdynamic_stream_settings.preload_stream
335  or fmt == RECORDER_PROVIDER
336  ) and fmt in self._outputs_outputs:
337  await self.remove_providerremove_provider(self._outputs_outputs[fmt])
338  self.check_idlecheck_idle()
339 
340  provider = PROVIDERS[fmt](
341  self.hasshass,
342  IdleTimer(self.hasshass, timeout, idle_callback),
343  self._stream_settings_stream_settings,
344  self.dynamic_stream_settingsdynamic_stream_settings,
345  )
346  self._outputs_outputs[fmt] = provider
347 
348  return provider
349 
350  async def remove_provider(self, provider: StreamOutput) -> None:
351  """Remove provider output stream."""
352  if provider.name in self._outputs_outputs:
353  self._outputs_outputs[provider.name].cleanup()
354  del self._outputs_outputs[provider.name]
355 
356  if not self._outputs_outputs:
357  await self.stopstop()
358 
359  def check_idle(self) -> None:
360  """Reset access token if all providers are idle."""
361  if all(p.idle for p in self._outputs_outputs.values()):
362  self.access_tokenaccess_token = None
363 
364  @property
365  def available(self) -> bool:
366  """Return False if the stream is started and known to be unavailable."""
367  return self._available_available
368 
369  def set_update_callback(self, update_callback: Callable[[], None]) -> None:
370  """Set callback to run when state changes."""
371  self._update_callback_update_callback = update_callback
372 
373  @callback
374  def _async_update_state(self, available: bool) -> None:
375  """Set state and Run callback to notify state has been updated."""
376  self._available_available = available
377  if self._update_callback_update_callback:
378  self._update_callback_update_callback()
379 
380  async def start(self) -> None:
381  """Start a stream.
382 
383  Uses an asyncio.Lock to avoid conflicts with _stop().
384  """
385  async with self._start_stop_lock_start_stop_lock:
386  if self._thread_thread and self._thread_thread.is_alive():
387  return
388  if self._thread_thread is not None:
389  # The thread must have crashed/exited. Join to clean up the
390  # previous thread.
391  self._thread_thread.join(timeout=0)
392  self._thread_quit_thread_quit.clear()
393  self._thread_thread = threading.Thread(
394  name="stream_worker",
395  target=self._run_worker_run_worker,
396  )
397  self._thread_thread.start()
398  self._logger_logger.debug(
399  "Started stream: %s", redact_credentials(str(self.sourcesource))
400  )
401 
402  def update_source(self, new_source: str) -> None:
403  """Restart the stream with a new stream source."""
404  self._diagnostics_diagnostics.increment("update_source")
405  self._logger_logger.debug(
406  "Updating stream source %s", redact_credentials(str(new_source))
407  )
408  self.sourcesource = new_source
409  self._fast_restart_once_fast_restart_once = True
410  self._thread_quit_thread_quit.set()
411 
412  def _set_state(self, available: bool) -> None:
413  """Set the stream state by updating the callback."""
414  # Call with call_soon_threadsafe since we know _async_update_state is always
415  # all callback function instead of using add_job which would have to work
416  # it out each time
417  self.hasshass.loop.call_soon_threadsafe(self._async_update_state_async_update_state, available)
418 
419  def _run_worker(self) -> None:
420  """Handle consuming streams and restart keepalive streams."""
421  # Keep import here so that we can import stream integration without installing reqs
422  # pylint: disable-next=import-outside-toplevel
423  from .worker import StreamState, StreamWorkerError, stream_worker
424 
425  stream_state = StreamState(self.hasshass, self.outputsoutputs, self._diagnostics_diagnostics)
426  wait_timeout = 0
427  while not self._thread_quit_thread_quit.wait(timeout=wait_timeout):
428  start_time = time.time()
429  self._set_state_set_state(True)
430  self._diagnostics_diagnostics.set_value(
431  "keepalive", self.dynamic_stream_settingsdynamic_stream_settings.preload_stream
432  )
433  self._diagnostics_diagnostics.set_value(
434  "orientation", self.dynamic_stream_settingsdynamic_stream_settings.orientation
435  )
436  self._diagnostics_diagnostics.increment("start_worker")
437  try:
439  self.sourcesource,
440  self.pyav_optionspyav_options,
441  self._stream_settings_stream_settings,
442  stream_state,
443  self._keyframe_converter_keyframe_converter,
444  self._thread_quit_thread_quit,
445  )
446  except StreamWorkerError as err:
447  self._diagnostics_diagnostics.increment("worker_error")
448  self._logger_logger.error("Error from stream worker: %s", str(err))
449 
450  stream_state.discontinuity()
451  if not _should_retry() or self._thread_quit_thread_quit.is_set():
452  if self._fast_restart_once_fast_restart_once:
453  # The stream source is updated, restart without any delay and reset the retry
454  # backoff for the new url.
455  wait_timeout = 0
456  self._fast_restart_once_fast_restart_once = False
457  self._thread_quit_thread_quit.clear()
458  continue
459  break
460 
461  self._set_state_set_state(False)
462  # To avoid excessive restarts, wait before restarting
463  # As the required recovery time may be different for different setups, start
464  # with trying a short wait_timeout and increase it on each reconnection attempt.
465  # Reset the wait_timeout after the worker has been up for several minutes
466  if time.time() - start_time > STREAM_RESTART_RESET_TIME:
467  wait_timeout = 0
468  wait_timeout += STREAM_RESTART_INCREMENT
469  self._diagnostics_diagnostics.set_value("retry_timeout", wait_timeout)
470  self._logger_logger.debug(
471  "Restarting stream worker in %d seconds: %s",
472  wait_timeout,
473  redact_credentials(str(self.sourcesource)),
474  )
475 
476  async def worker_finished() -> None:
477  # The worker is no checking availability of the stream and can no longer track
478  # availability so mark it as available, otherwise the frontend may not be able to
479  # interact with the stream.
480  if not self.availableavailable:
481  self._async_update_state_async_update_state(True)
482  # We can call remove_provider() sequentially as the wrapped _stop() function
483  # which blocks internally is only called when the last provider is removed.
484  for provider in self.outputsoutputs().values():
485  await self.remove_providerremove_provider(provider)
486 
487  self.hasshass.create_task(worker_finished())
488 
489  async def stop(self) -> None:
490  """Remove outputs and access token."""
491  self._outputs_outputs = {}
492  self.access_tokenaccess_token = None
493 
494  if not self.dynamic_stream_settingsdynamic_stream_settings.preload_stream:
495  await self._stop_stop()
496 
497  async def _stop(self) -> None:
498  """Stop worker thread.
499 
500  Uses an asyncio.Lock to avoid conflicts with start().
501  """
502  async with self._start_stop_lock_start_stop_lock:
503  if self._thread_thread is None:
504  return
505  self._thread_quit_thread_quit.set()
506  await self.hasshass.async_add_executor_job(self._thread_thread.join)
507  self._thread_thread = None
508  self._logger_logger.debug(
509  "Stopped stream: %s", redact_credentials(str(self.sourcesource))
510  )
511 
512  async def async_record(
513  self, video_path: str, duration: int = 30, lookback: int = 5
514  ) -> None:
515  """Make a .mp4 recording from a provided stream."""
516 
517  # Keep import here so that we can import stream integration without installing reqs
518  # pylint: disable-next=import-outside-toplevel
519  from .recorder import RecorderOutput
520 
521  # Check for file access
522  if not self.hasshass.config.is_allowed_path(video_path):
523  raise HomeAssistantError(f"Can't write {video_path}, no access to path!")
524 
525  # Add recorder
526  if recorder := self.outputsoutputs().get(RECORDER_PROVIDER):
527  assert isinstance(recorder, RecorderOutput)
528  raise HomeAssistantError(
529  f"Stream already recording to {recorder.video_path}!"
530  )
531  recorder = cast(
532  RecorderOutput, self.add_provideradd_provider(RECORDER_PROVIDER, timeout=duration)
533  )
534  recorder.video_path = video_path
535 
536  await self.startstart()
537 
538  self._logger_logger.debug("Started a stream recording of %s seconds", duration)
539 
540  # Take advantage of lookback
541  hls: HlsStreamOutput = cast(HlsStreamOutput, self.outputsoutputs().get(HLS_PROVIDER))
542  if hls:
543  num_segments = min(int(lookback / hls.target_duration) + 1, MAX_SEGMENTS)
544  # Wait for latest segment, then add the lookback
545  await hls.recv()
546  recorder.prepend(list(hls.get_segments())[-num_segments - 1 : -1])
547 
548  await recorder.async_record()
549 
550  async def async_get_image(
551  self,
552  width: int | None = None,
553  height: int | None = None,
554  wait_for_next_keyframe: bool = False,
555  ) -> bytes | None:
556  """Fetch an image from the Stream and return it as a jpeg in bytes.
557 
558  Calls async_get_image from KeyFrameConverter. async_get_image should only be
559  called directly from the main loop and not from an executor thread as it uses
560  hass.add_executor_job underneath the hood.
561  """
562 
563  self.add_provideradd_provider(HLS_PROVIDER)
564  await self.startstart()
565  return await self._keyframe_converter_keyframe_converter.async_get_image(
566  width=width,
567  height=height,
568  wait_for_next_keyframe=wait_for_next_keyframe,
569  )
570 
571  def get_diagnostics(self) -> dict[str, Any]:
572  """Return diagnostics information for the stream."""
573  return self._diagnostics_diagnostics.as_dict()
574 
575 
576 def _should_retry() -> bool:
577  """Return true if worker failures should be retried, for disabling during tests."""
578  return True
579 
580 
581 STREAM_OPTIONS_SCHEMA: Final = vol.Schema(
582  {
583  vol.Optional(CONF_RTSP_TRANSPORT): vol.In(RTSP_TRANSPORTS),
584  vol.Optional(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): bool,
585  vol.Optional(CONF_EXTRA_PART_WAIT_TIME): cv.positive_float,
586  }
587 )
None set_update_callback(self, Callable[[], None] update_callback)
Definition: __init__.py:369
bytes|None async_get_image(self, int|None width=None, int|None height=None, bool wait_for_next_keyframe=False)
Definition: __init__.py:555
None async_record(self, str video_path, int duration=30, int lookback=5)
Definition: __init__.py:514
None _set_state(self, bool available)
Definition: __init__.py:412
None __init__(self, HomeAssistant hass, str source, dict[str, str] pyav_options, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings, str|None stream_label=None)
Definition: __init__.py:285
StreamOutput add_provider(self, str fmt, int timeout=OUTPUT_IDLE_TIMEOUT)
Definition: __init__.py:328
Mapping[str, StreamOutput] outputs(self)
Definition: __init__.py:320
dict[str, Any] get_diagnostics(self)
Definition: __init__.py:571
None update_source(self, str new_source)
Definition: __init__.py:402
None _async_update_state(self, bool available)
Definition: __init__.py:374
None remove_provider(self, StreamOutput provider)
Definition: __init__.py:350
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
str async_setup_hls(HomeAssistant hass)
Definition: hls.py:37
None async_setup_recorder(HomeAssistant hass)
Definition: recorder.py:31
None stream_worker(str source, dict[str, str] pyav_options, StreamSettings stream_settings, StreamState stream_state, KeyFrameConverter keyframe_converter, Event quit_event)
Definition: worker.py:522
Stream create_stream(HomeAssistant hass, str stream_source, Mapping[str, str|bool|float] options, DynamicStreamSettings dynamic_stream_settings, str|None stream_label=None)
Definition: __init__.py:117
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:201
None set_pyav_logging(bool enable)
Definition: __init__.py:194
str redact_credentials(str url)
Definition: __init__.py:98
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)
Definition: setup.py:691