Home Assistant Unofficial Reference 2024.12.1
worker.py
Go to the documentation of this file.
1 """Provides the worker thread needed for processing streams."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict, deque
6 from collections.abc import Callable, Generator, Iterator, Mapping
7 import contextlib
8 from dataclasses import fields
9 import datetime
10 from io import SEEK_END, BytesIO
11 import logging
12 from threading import Event
13 from typing import Any, Self, cast
14 
15 import av
16 import av.audio
17 import av.container
18 import av.stream
19 
20 from homeassistant.core import HomeAssistant
21 from homeassistant.util import dt as dt_util
22 
23 from . import redact_credentials
24 from .const import (
25  AUDIO_CODECS,
26  HLS_PROVIDER,
27  MAX_MISSING_DTS,
28  MAX_TIMESTAMP_GAP,
29  PACKETS_TO_WAIT_FOR_AUDIO,
30  SEGMENT_CONTAINER_FORMAT,
31  SOURCE_TIMEOUT,
32 )
33 from .core import (
34  STREAM_SETTINGS_NON_LL_HLS,
35  KeyFrameConverter,
36  Part,
37  Segment,
38  StreamOutput,
39  StreamSettings,
40 )
41 from .diagnostics import Diagnostics
42 from .fmp4utils import read_init
43 from .hls import HlsStreamOutput
44 
45 _LOGGER = logging.getLogger(__name__)
46 NEGATIVE_INF = float("-inf")
47 
48 
49 class StreamWorkerError(Exception):
50  """An exception thrown while processing a stream."""
51 
52 
53 def redact_av_error_string(err: av.FFmpegError) -> str:
54  """Return an error string with credentials redacted from the url."""
55  parts = [str(err.type), err.strerror] # type: ignore[attr-defined]
56  if err.filename:
57  parts.append(redact_credentials(err.filename))
58  return ", ".join(parts)
59 
60 
62  """Raised when the stream is complete, exposed for facilitating testing."""
63 
64 
66  """Responsible for trakcing output and playback state for a stream.
67 
68  Holds state used for playback to interpret a decoded stream. A source stream
69  may be reset (e.g. reconnecting to an rtsp stream) and this object tracks
70  the state to inform the player.
71  """
72 
73  def __init__(
74  self,
75  hass: HomeAssistant,
76  outputs_callback: Callable[[], Mapping[str, StreamOutput]],
77  diagnostics: Diagnostics,
78  ) -> None:
79  """Initialize StreamState."""
80  self._stream_id: int = 0
81  self.hasshass = hass
82  self._outputs_callback: Callable[[], Mapping[str, StreamOutput]] = (
83  outputs_callback
84  )
85  # sequence gets incremented before the first segment so the first segment
86  # has a sequence number of 0.
87  self._sequence_sequence = -1
88  self._diagnostics_diagnostics = diagnostics
89 
90  @property
91  def sequence(self) -> int:
92  """Return the current sequence for the latest segment."""
93  return self._sequence_sequence
94 
95  def next_sequence(self) -> int:
96  """Increment the sequence number."""
97  self._sequence_sequence += 1
98  return self._sequence_sequence
99 
100  @property
101  def stream_id(self) -> int:
102  """Return the readonly stream_id attribute."""
103  return self._stream_id
104 
105  def discontinuity(self) -> None:
106  """Mark the stream as having been restarted."""
107  # Preserving sequence and stream_id here keep the HLS playlist logic
108  # simple to check for discontinuity at output time, and to determine
109  # the discontinuity sequence number.
110  self._stream_id += 1
111  # Call discontinuity to fix incomplete segment in HLS output
112  if hls_output := self._outputs_callback().get(HLS_PROVIDER):
113  cast(HlsStreamOutput, hls_output).discontinuity()
114 
115  @property
116  def outputs(self) -> list[StreamOutput]:
117  """Return the active stream outputs."""
118  return list(self._outputs_callback().values())
119 
120  @property
121  def diagnostics(self) -> Diagnostics:
122  """Return diagnostics object."""
123  return self._diagnostics_diagnostics
124 
125 
127  """StreamMuxer re-packages video/audio packets for output."""
128 
129  _segment_start_dts: int
130  _memory_file: BytesIO
131  _av_output: av.container.OutputContainer
132  _output_video_stream: av.VideoStream
133  _output_audio_stream: av.audio.AudioStream | None
134  _segment: Segment | None
135  # the following 2 member variables are used for Part formation
136  _memory_file_pos: int
137  _part_start_dts: float
138 
139  def __init__(
140  self,
141  hass: HomeAssistant,
142  video_stream: av.VideoStream,
143  audio_stream: av.audio.AudioStream | None,
144  audio_bsf: str | None,
145  stream_state: StreamState,
146  stream_settings: StreamSettings,
147  ) -> None:
148  """Initialize StreamMuxer."""
149  self._hass_hass = hass
150  self._input_video_stream_input_video_stream = video_stream
151  self._input_audio_stream_input_audio_stream = audio_stream
152  self._audio_bsf_audio_bsf = audio_bsf
153  self._audio_bsf_context_audio_bsf_context: av.BitStreamFilterContext | None = None
154  self._part_has_keyframe_part_has_keyframe = False
155  self._stream_settings_stream_settings = stream_settings
156  self._stream_state_stream_state = stream_state
157  self._start_time_start_time = dt_util.utcnow()
158 
160  self,
161  memory_file: BytesIO,
162  sequence: int,
163  input_vstream: av.VideoStream,
164  input_astream: av.audio.AudioStream | None,
165  ) -> tuple[
166  av.container.OutputContainer,
167  av.VideoStream,
168  av.audio.AudioStream | None,
169  ]:
170  """Make a new av OutputContainer and add output streams."""
171  container_options: dict[str, str] = {
172  # Removed skip_sidx - see:
173  # https://github.com/home-assistant/core/pull/39970
174  # "cmaf" flag replaces several of the movflags used,
175  # but too recent to use for now
176  "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
177  # Sometimes the first segment begins with negative timestamps,
178  # and this setting just
179  # adjusts the timestamps in the output from that segment to start
180  # from 0. Helps from having to make some adjustments
181  # in test_durations
182  "avoid_negative_ts": "make_non_negative",
183  "fragment_index": str(sequence + 1),
184  "video_track_timescale": str(int(1 / input_vstream.time_base)), # type: ignore[operator]
185  # Only do extra fragmenting if we are using ll_hls
186  # Let ffmpeg do the work using frag_duration
187  # Fragment durations may exceed the 15% allowed variance but it seems ok
188  **(
189  {
190  "movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
191  # Create a fragment every TARGET_PART_DURATION. The data from
192  # each fragment is stored in a "Part" that can be combined with
193  # the data from all the other "Part"s, plus an init section,
194  # to reconstitute the data in a "Segment".
195  #
196  # The LL-HLS spec allows for a fragment's duration to be within
197  # the range [0.85x,1.0x] of the part target duration. We use the
198  # frag_duration option to tell ffmpeg to try to cut the
199  # fragments when they reach frag_duration. However,
200  # the resulting fragments can have variability in their
201  # durations and can end up being too short or too long. With a
202  # video track with no audio, the discrete nature of frames means
203  # that the frame at the end of a fragment will sometimes extend
204  # slightly beyond the desired frag_duration.
205  #
206  # If there are two tracks, as in the case of a video feed with
207  # audio, there is an added wrinkle as the fragment cut seems to
208  # be done on the first track that crosses the desired threshold,
209  # and cutting on the audio track may also result in a shorter
210  # video fragment than desired.
211  #
212  # Given this, our approach is to give ffmpeg a frag_duration
213  # somewhere in the middle of the range, hoping that the parts
214  # stay pretty well bounded, and we adjust the part durations
215  # a bit in the hls metadata so that everything "looks" ok.
216  "frag_duration": str(
217  int(self._stream_settings_stream_settings.part_target_duration * 9e5)
218  ),
219  }
220  if self._stream_settings_stream_settings.ll_hls
221  else {}
222  ),
223  }
224  container = av.open(
225  memory_file,
226  mode="w",
227  format=SEGMENT_CONTAINER_FORMAT,
228  container_options=container_options,
229  )
230  output_vstream = container.add_stream(template=input_vstream)
231  # Check if audio is requested
232  output_astream = None
233  if input_astream:
234  if self._audio_bsf_audio_bsf:
235  self._audio_bsf_context_audio_bsf_context = av.BitStreamFilterContext(
236  self._audio_bsf_audio_bsf, input_astream
237  )
238  output_astream = container.add_stream(template=input_astream)
239  return container, output_vstream, output_astream # type: ignore[return-value]
240 
241  def reset(self, video_dts: int) -> None:
242  """Initialize a new stream segment."""
243  self._part_start_dts_part_start_dts = self._segment_start_dts_segment_start_dts = video_dts
244  self._segment_segment = None
245  self._memory_file_memory_file = BytesIO()
246  self._memory_file_pos_memory_file_pos = 0
247  (
248  self._av_output,
249  self._output_video_stream,
250  self._output_audio_stream,
251  ) = self.make_new_avmake_new_av(
252  memory_file=self._memory_file_memory_file,
253  sequence=self._stream_state_stream_state.next_sequence(),
254  input_vstream=self._input_video_stream_input_video_stream,
255  input_astream=self._input_audio_stream_input_audio_stream,
256  )
257  if self._output_video_stream.name == "hevc":
258  self._output_video_stream.codec_context.codec_tag = "hvc1"
259 
260  def mux_packet(self, packet: av.Packet) -> None:
261  """Mux a packet to the appropriate output stream."""
262 
263  # Check for end of segment
264  if packet.stream == self._input_video_stream_input_video_stream:
265  if (
266  packet.is_keyframe
267  and (packet.dts - self._segment_start_dts_segment_start_dts) * packet.time_base
268  >= self._stream_settings_stream_settings.min_segment_duration
269  ):
270  # Flush segment (also flushes the stub part segment)
271  self.flushflush(packet, last_part=True)
272 
273  # Mux the packet
274  packet.stream = self._output_video_stream
275  self._av_output.mux(packet)
276  self.check_flush_partcheck_flush_part(packet)
277  self._part_has_keyframe_part_has_keyframe |= packet.is_keyframe
278 
279  elif packet.stream == self._input_audio_stream_input_audio_stream:
280  assert self._output_audio_stream
281  if self._audio_bsf_context_audio_bsf_context:
282  for audio_packet in self._audio_bsf_context_audio_bsf_context.filter(packet):
283  audio_packet.stream = self._output_audio_stream
284  self._av_output.mux(audio_packet)
285  return
286  packet.stream = self._output_audio_stream
287  self._av_output.mux(packet)
288 
289  def create_segment(self) -> None:
290  """Create a segment when the moov is ready."""
291  self._segment_segment = Segment(
292  sequence=self._stream_state_stream_state.sequence,
293  stream_id=self._stream_state_stream_state.stream_id,
294  init=read_init(self._memory_file_memory_file),
295  # Fetch the latest StreamOutputs, which may have changed since the
296  # worker started.
297  _stream_outputs=self._stream_state_stream_state.outputs,
298  start_time=self._start_time_start_time,
299  )
300  self._memory_file_pos_memory_file_pos = self._memory_file_memory_file.tell()
301  self._memory_file_memory_file.seek(0, SEEK_END)
302 
303  def check_flush_part(self, packet: av.Packet) -> None:
304  """Check for and mark a part segment boundary and record its duration."""
305  if self._memory_file_pos_memory_file_pos == self._memory_file_memory_file.tell():
306  return
307  if self._segment_segment is None:
308  # We have our first non-zero byte position. This means the init has just
309  # been written. Create a Segment and put it to the queue of each output.
310  self.create_segmentcreate_segment()
311  # When using delay_moov, the moov is not written until a moof is also ready
312  # Flush the moof
313  self.flushflush(packet, last_part=False)
314  else: # These are the ends of the part segments
315  self.flushflush(packet, last_part=False)
316 
317  def flush(self, packet: av.Packet, last_part: bool) -> None:
318  """Output a part from the most recent bytes in the memory_file.
319 
320  If last_part is True, also close the segment, give it a duration,
321  and clean up the av_output and memory_file.
322  There are two different ways to enter this function, and when
323  last_part is True, packet has not yet been muxed, while when
324  last_part is False, the packet has already been muxed. However,
325  in both cases, packet is the next packet and is not included in
326  the Part.
327  This function writes the duration metadata for the Part and
328  for the Segment. However, as the fragmentation done by ffmpeg
329  may result in fragment durations which fall outside the
330  [0.85x,1.0x] tolerance band allowed by LL-HLS, we need to fudge
331  some durations a bit by reporting them as being within that
332  range.
333  Note that repeated adjustments may cause drift between the part
334  durations in the metadata and those in the media and result in
335  playback issues in some clients.
336  """
337  # Part durations should not exceed the part target duration
338  adjusted_dts = min(
339  packet.dts,
340  self._part_start_dts_part_start_dts
341  + self._stream_settings_stream_settings.part_target_duration / packet.time_base,
342  )
343  if last_part:
344  # Closing the av_output will write the remaining buffered data to the
345  # memory_file as a new moof/mdat.
346  self._av_output.close()
347  # With delay_moov, this may be the first time the file pointer has
348  # moved, so the segment may not yet have been created
349  if not self._segment_segment:
350  self.create_segmentcreate_segment()
351  elif not self._part_has_keyframe_part_has_keyframe:
352  # Parts which are not the last part or an independent part should
353  # not have durations below 0.85 of the part target duration.
354  adjusted_dts = max(
355  adjusted_dts,
356  self._part_start_dts_part_start_dts
357  + 0.85 * self._stream_settings_stream_settings.part_target_duration / packet.time_base,
358  )
359  # Undo dts adjustments if we don't have ll_hls
360  if not self._stream_settings_stream_settings.ll_hls:
361  adjusted_dts = packet.dts
362  assert self._segment_segment
363  self._memory_file_memory_file.seek(self._memory_file_pos_memory_file_pos)
364  self._hass_hass.loop.call_soon_threadsafe(
365  self._segment_segment.async_add_part,
366  Part(
367  duration=float(
368  (adjusted_dts - self._part_start_dts_part_start_dts) * packet.time_base
369  ),
370  has_keyframe=self._part_has_keyframe_part_has_keyframe,
371  data=self._memory_file_memory_file.read(),
372  ),
373  (
374  (
375  segment_duration := float(
376  (adjusted_dts - self._segment_start_dts_segment_start_dts) * packet.time_base
377  )
378  )
379  if last_part
380  else 0
381  ),
382  )
383  if last_part:
384  # If we've written the last part, we can close the memory_file.
385  self._memory_file_memory_file.close() # We don't need the BytesIO object anymore
386  self._start_time_start_time += datetime.timedelta(seconds=segment_duration)
387  # Reinitialize
388  self.resetreset(packet.dts)
389  else:
390  # For the last part, these will get set again elsewhere so we can skip
391  # setting them here.
392  self._memory_file_pos_memory_file_pos = self._memory_file_memory_file.tell()
393  self._part_start_dts_part_start_dts = adjusted_dts
394  self._part_has_keyframe_part_has_keyframe = False
395 
396  def close(self) -> None:
397  """Close stream buffer."""
398  self._av_output.close()
399  self._memory_file_memory_file.close()
400 
401 
402 class PeekIterator(Iterator[av.Packet]):
403  """An Iterator that may allow multiple passes.
404 
405  This may be consumed like a normal Iterator, however also supports a
406  peek() method that buffers consumed items from the iterator.
407  """
408 
409  def __init__(self, iterator: Iterator[av.Packet]) -> None:
410  """Initialize PeekIterator."""
411  self._iterator_iterator = iterator
412  self._buffer: deque[av.Packet] = deque()
413  # A pointer to either _iterator or _buffer
414  self._next_next = self._iterator_iterator.__next__
415 
416  def __iter__(self) -> Self:
417  """Return an iterator."""
418  return self
419 
420  def __next__(self) -> av.Packet:
421  """Return and consume the next item available."""
422  return self._next_next()
423 
424  def _pop_buffer(self) -> av.Packet:
425  """Consume items from the buffer until exhausted."""
426  if self._buffer:
427  return self._buffer.popleft()
428  # The buffer is empty, so change to consume from the iterator
429  self._next_next = self._iterator_iterator.__next__
430  return self._next_next()
431 
432  def peek(self) -> Generator[av.Packet]:
433  """Return items without consuming from the iterator."""
434  # Items consumed are added to a buffer for future calls to __next__
435  # or peek. First iterate over the buffer from previous calls to peek.
436  self._next_next = self._pop_buffer_pop_buffer
437  yield from self._buffer
438  for packet in self._iterator_iterator:
439  self._buffer.append(packet)
440  yield packet
441 
442 
444  """Validate ordering of timestamps for packets in a stream."""
445 
446  def __init__(self, inv_video_time_base: int, inv_audio_time_base: int) -> None:
447  """Initialize the TimestampValidator."""
448  # Decompression timestamp of last packet in each stream
449  self._last_dts: dict[av.stream.Stream, int | float] = defaultdict(
450  lambda: NEGATIVE_INF
451  )
452  # Number of consecutive missing decompression timestamps
453  self._missing_dts_missing_dts = 0
454  # For the bounds, just use the larger of the two values. If the error
455  # is not flagged by one stream, it should just get flagged by the other
456  # stream. Either value should result in a value which is much less than
457  # a 32 bit INT_MAX, which helps avoid the assertion error from FFmpeg.
458  self._max_dts_gap_max_dts_gap = MAX_TIMESTAMP_GAP * max(
459  inv_video_time_base, inv_audio_time_base
460  )
461 
462  def is_valid(self, packet: av.Packet) -> bool:
463  """Validate the packet timestamp based on ordering within the stream."""
464  # Discard packets missing DTS. Terminate if too many are missing.
465  if packet.dts is None:
466  if self._missing_dts_missing_dts >= MAX_MISSING_DTS: # type: ignore[unreachable]
467  raise StreamWorkerError(
468  f"No dts in {MAX_MISSING_DTS+1} consecutive packets"
469  )
470  self._missing_dts_missing_dts += 1
471  return False
472  self._missing_dts_missing_dts = 0
473  # Discard when dts is not monotonic. Terminate if gap is too wide.
474  prev_dts = self._last_dts[packet.stream]
475  if abs(prev_dts - packet.dts) > self._max_dts_gap_max_dts_gap and prev_dts != NEGATIVE_INF:
476  raise StreamWorkerError(
477  f"Timestamp discontinuity detected: last dts = {prev_dts}, dts ="
478  f" {packet.dts}"
479  )
480  if packet.dts <= prev_dts:
481  return False
482  self._last_dts[packet.stream] = packet.dts
483  return True
484 
485 
486 def is_keyframe(packet: av.Packet) -> Any:
487  """Return true if the packet is a keyframe."""
488  return packet.is_keyframe
489 
490 
492  packets: Iterator[av.Packet], audio_stream: Any
493 ) -> str | None:
494  """Return the aac_adtstoasc bitstream filter if ADTS AAC is detected."""
495  if not audio_stream:
496  return None
497  for count, packet in enumerate(packets):
498  if count >= PACKETS_TO_WAIT_FOR_AUDIO:
499  # Some streams declare an audio stream and never send any packets
500  _LOGGER.warning("Audio stream not found")
501  break
502  if packet.stream == audio_stream:
503  # detect ADTS AAC and disable audio
504  if audio_stream.codec.name == "aac" and packet.size > 2:
505  with memoryview(packet) as packet_view:
506  if packet_view[0] == 0xFF and packet_view[1] & 0xF0 == 0xF0:
507  _LOGGER.debug(
508  "ADTS AAC detected. Adding aac_adtstoaac bitstream filter"
509  )
510  return "aac_adtstoasc"
511  break
512  return None
513 
514 
516  source: str,
517  pyav_options: dict[str, str],
518  stream_settings: StreamSettings,
519  stream_state: StreamState,
520  keyframe_converter: KeyFrameConverter,
521  quit_event: Event,
522 ) -> None:
523  """Handle consuming streams."""
524 
525  if av.library_versions["libavformat"][0] >= 59 and "stimeout" in pyav_options:
526  # the stimeout option was renamed to timeout as of ffmpeg 5.0
527  pyav_options["timeout"] = pyav_options["stimeout"]
528  del pyav_options["stimeout"]
529  try:
530  container = av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT)
531  except av.FFmpegError as err:
532  raise StreamWorkerError(
533  f"Error opening stream ({redact_av_error_string(err)})"
534  ) from err
535  try:
536  video_stream = container.streams.video[0]
537  except (KeyError, IndexError) as ex:
538  raise StreamWorkerError("Stream has no video") from ex
539  keyframe_converter.create_codec_context(codec_context=video_stream.codec_context)
540  try:
541  audio_stream = container.streams.audio[0]
542  except (KeyError, IndexError):
543  audio_stream = None
544  if audio_stream and audio_stream.name not in AUDIO_CODECS:
545  audio_stream = None
546  # Some audio streams do not have a profile and throw errors when remuxing
547  if audio_stream and audio_stream.profile is None:
548  audio_stream = None # type: ignore[unreachable]
549  # Disable ll-hls for hls inputs
550  if container.format.name == "hls":
551  for field in fields(StreamSettings):
552  setattr(
553  stream_settings,
554  field.name,
555  getattr(STREAM_SETTINGS_NON_LL_HLS, field.name),
556  )
557  stream_state.diagnostics.set_value("container_format", container.format.name)
558  stream_state.diagnostics.set_value("video_codec", video_stream.name)
559  if audio_stream:
560  stream_state.diagnostics.set_value("audio_codec", audio_stream.name)
561 
562  dts_validator = TimestampValidator(
563  int(1 / video_stream.time_base), # type: ignore[operator]
564  int(1 / audio_stream.time_base) if audio_stream else 1, # type: ignore[operator]
565  )
566  container_packets = PeekIterator(
567  filter(dts_validator.is_valid, container.demux((video_stream, audio_stream)))
568  )
569 
570  def is_video(packet: av.Packet) -> Any:
571  """Return true if the packet is for the video stream."""
572  return packet.stream.type == "video"
573 
574  # Have to work around two problems with RTSP feeds in ffmpeg
575  # 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018
576  # 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815
577  #
578  # Use a peeking iterator to peek into the start of the stream, ensuring
579  # everything looks good, then go back to the start when muxing below.
580  try:
581  # Get the required bitstream filter
582  audio_bsf = get_audio_bitstream_filter(container_packets.peek(), audio_stream)
583  # Advance to the first keyframe for muxing, then rewind so the muxing
584  # loop below can consume.
585  first_keyframe = next(
586  filter(lambda pkt: is_keyframe(pkt) and is_video(pkt), container_packets)
587  )
588  # Deal with problem #1 above (bad first packet pts/dts) by recalculating
589  # using pts/dts from second packet. Use the peek iterator to advance
590  # without consuming from container_packets. Skip over the first keyframe
591  # then use the duration from the second video packet to adjust dts.
592  next_video_packet = next(filter(is_video, container_packets.peek()))
593  # Since the is_valid filter has already been applied before the following
594  # adjustment, it does not filter out the case where the duration below is
595  # 0 and both the first_keyframe and next_video_packet end up with the same
596  # dts. Use "or 1" to deal with this.
597  start_dts = next_video_packet.dts - (next_video_packet.duration or 1)
598  first_keyframe.dts = first_keyframe.pts = start_dts
599  except StreamWorkerError:
600  container.close()
601  raise
602  except StopIteration as ex:
603  container.close()
604  raise StreamEndedError("Stream ended; no additional packets") from ex
605  except av.FFmpegError as ex:
606  container.close()
607  raise StreamWorkerError(
608  f"Error demuxing stream while finding first packet ({redact_av_error_string(ex)})"
609  ) from ex
610 
611  muxer = StreamMuxer(
612  stream_state.hass,
613  video_stream,
614  audio_stream,
615  audio_bsf,
616  stream_state,
617  stream_settings,
618  )
619  muxer.reset(start_dts)
620 
621  # Mux the first keyframe, then proceed through the rest of the packets
622  muxer.mux_packet(first_keyframe)
623 
624  with contextlib.closing(container), contextlib.closing(muxer):
625  while not quit_event.is_set():
626  try:
627  packet = next(container_packets)
628  except StreamWorkerError:
629  raise
630  except StopIteration as ex:
631  raise StreamEndedError("Stream ended; no additional packets") from ex
632  except av.FFmpegError as ex:
633  raise StreamWorkerError(
634  f"Error demuxing stream ({redact_av_error_string(ex)})"
635  ) from ex
636 
637  muxer.mux_packet(packet)
638 
639  if packet.is_keyframe and is_video(packet):
640  keyframe_converter.stash_keyframe_packet(packet)
None __init__(self, Iterator[av.Packet] iterator)
Definition: worker.py:409
None mux_packet(self, av.Packet packet)
Definition: worker.py:260
None flush(self, av.Packet packet, bool last_part)
Definition: worker.py:317
None __init__(self, HomeAssistant hass, av.VideoStream video_stream, av.audio.AudioStream|None audio_stream, str|None audio_bsf, StreamState stream_state, StreamSettings stream_settings)
Definition: worker.py:147
tuple[ av.container.OutputContainer, av.VideoStream, av.audio.AudioStream|None,] make_new_av(self, BytesIO memory_file, int sequence, av.VideoStream input_vstream, av.audio.AudioStream|None input_astream)
Definition: worker.py:169
None check_flush_part(self, av.Packet packet)
Definition: worker.py:303
None __init__(self, HomeAssistant hass, Callable[[], Mapping[str, StreamOutput]] outputs_callback, Diagnostics diagnostics)
Definition: worker.py:78
None __init__(self, int inv_video_time_base, int inv_audio_time_base)
Definition: worker.py:446
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bytes read_init(BufferedIOBase bytes_io)
Definition: fmp4utils.py:163
str|None get_audio_bitstream_filter(Iterator[av.Packet] packets, Any audio_stream)
Definition: worker.py:493
str redact_av_error_string(av.FFmpegError err)
Definition: worker.py:53
Any is_keyframe(av.Packet packet)
Definition: worker.py:486
None stream_worker(str source, dict[str, str] pyav_options, StreamSettings stream_settings, StreamState stream_state, KeyFrameConverter keyframe_converter, Event quit_event)
Definition: worker.py:522
str redact_credentials(str url)
Definition: __init__.py:98