1 """Provides the worker thread needed for processing streams."""
3 from __future__
import annotations
5 from collections
import defaultdict, deque
6 from collections.abc
import Callable, Generator, Iterator, Mapping
8 from dataclasses
import fields
10 from io
import SEEK_END, BytesIO
12 from threading
import Event
13 from typing
import Any, Self, cast
23 from .
import redact_credentials
29 PACKETS_TO_WAIT_FOR_AUDIO,
30 SEGMENT_CONTAINER_FORMAT,
34 STREAM_SETTINGS_NON_LL_HLS,
41 from .diagnostics
import Diagnostics
42 from .fmp4utils
import read_init
43 from .hls
import HlsStreamOutput
45 _LOGGER = logging.getLogger(__name__)
50 """An exception thrown while processing a stream."""
54 """Return an error string with credentials redacted from the url."""
55 parts = [
str(err.type), err.strerror]
58 return ", ".join(parts)
62 """Raised when the stream is complete, exposed for facilitating testing."""
66 """Responsible for trakcing output and playback state for a stream.
68 Holds state used for playback to interpret a decoded stream. A source stream
69 may be reset (e.g. reconnecting to an rtsp stream) and this object tracks
70 the state to inform the player.
76 outputs_callback: Callable[[], Mapping[str, StreamOutput]],
77 diagnostics: Diagnostics,
79 """Initialize StreamState."""
80 self._stream_id: int = 0
82 self._outputs_callback: Callable[[], Mapping[str, StreamOutput]] = (
92 """Return the current sequence for the latest segment."""
96 """Increment the sequence number."""
102 """Return the readonly stream_id attribute."""
103 return self._stream_id
106 """Mark the stream as having been restarted."""
112 if hls_output := self._outputs_callback().
get(HLS_PROVIDER):
117 """Return the active stream outputs."""
118 return list(self._outputs_callback().values())
122 """Return diagnostics object."""
127 """StreamMuxer re-packages video/audio packets for output."""
129 _segment_start_dts: int
130 _memory_file: BytesIO
131 _av_output: av.container.OutputContainer
132 _output_video_stream: av.VideoStream
133 _output_audio_stream: av.audio.AudioStream |
None
134 _segment: Segment |
None
136 _memory_file_pos: int
137 _part_start_dts: float
142 video_stream: av.VideoStream,
143 audio_stream: av.audio.AudioStream |
None,
144 audio_bsf: str |
None,
145 stream_state: StreamState,
146 stream_settings: StreamSettings,
148 """Initialize StreamMuxer."""
161 memory_file: BytesIO,
163 input_vstream: av.VideoStream,
164 input_astream: av.audio.AudioStream |
None,
166 av.container.OutputContainer,
168 av.audio.AudioStream |
None,
170 """Make a new av OutputContainer and add output streams."""
171 container_options: dict[str, str] = {
176 "movflags":
"frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
182 "avoid_negative_ts":
"make_non_negative",
183 "fragment_index":
str(sequence + 1),
184 "video_track_timescale":
str(
int(1 / input_vstream.time_base)),
190 "movflags":
"empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
216 "frag_duration":
str(
227 format=SEGMENT_CONTAINER_FORMAT,
228 container_options=container_options,
230 output_vstream = container.add_stream(template=input_vstream)
232 output_astream =
None
238 output_astream = container.add_stream(template=input_astream)
239 return container, output_vstream, output_astream
241 def reset(self, video_dts: int) ->
None:
242 """Initialize a new stream segment."""
249 self._output_video_stream,
250 self._output_audio_stream,
257 if self._output_video_stream.name ==
"hevc":
258 self._output_video_stream.codec_context.codec_tag =
"hvc1"
261 """Mux a packet to the appropriate output stream."""
271 self.
flushflush(packet, last_part=
True)
274 packet.stream = self._output_video_stream
275 self._av_output.mux(packet)
280 assert self._output_audio_stream
283 audio_packet.stream = self._output_audio_stream
284 self._av_output.mux(audio_packet)
286 packet.stream = self._output_audio_stream
287 self._av_output.mux(packet)
290 """Create a segment when the moov is ready."""
304 """Check for and mark a part segment boundary and record its duration."""
313 self.
flushflush(packet, last_part=
False)
315 self.
flushflush(packet, last_part=
False)
317 def flush(self, packet: av.Packet, last_part: bool) ->
None:
318 """Output a part from the most recent bytes in the memory_file.
320 If last_part is True, also close the segment, give it a duration,
321 and clean up the av_output and memory_file.
322 There are two different ways to enter this function, and when
323 last_part is True, packet has not yet been muxed, while when
324 last_part is False, the packet has already been muxed. However,
325 in both cases, packet is the next packet and is not included in
327 This function writes the duration metadata for the Part and
328 for the Segment. However, as the fragmentation done by ffmpeg
329 may result in fragment durations which fall outside the
330 [0.85x,1.0x] tolerance band allowed by LL-HLS, we need to fudge
331 some durations a bit by reporting them as being within that
333 Note that repeated adjustments may cause drift between the part
334 durations in the metadata and those in the media and result in
335 playback issues in some clients.
341 + self.
_stream_settings_stream_settings.part_target_duration / packet.time_base,
346 self._av_output.
close()
357 + 0.85 * self.
_stream_settings_stream_settings.part_target_duration / packet.time_base,
361 adjusted_dts = packet.dts
364 self.
_hass_hass.loop.call_soon_threadsafe(
365 self.
_segment_segment.async_add_part,
368 (adjusted_dts - self.
_part_start_dts_part_start_dts) * packet.time_base
375 segment_duration :=
float(
386 self.
_start_time_start_time += datetime.timedelta(seconds=segment_duration)
388 self.
resetreset(packet.dts)
397 """Close stream buffer."""
398 self._av_output.
close()
403 """An Iterator that may allow multiple passes.
405 This may be consumed like a normal Iterator, however also supports a
406 peek() method that buffers consumed items from the iterator.
409 def __init__(self, iterator: Iterator[av.Packet]) ->
None:
410 """Initialize PeekIterator."""
412 self._buffer: deque[av.Packet] = deque()
417 """Return an iterator."""
421 """Return and consume the next item available."""
422 return self.
_next_next()
425 """Consume items from the buffer until exhausted."""
427 return self._buffer.popleft()
430 return self.
_next_next()
432 def peek(self) -> Generator[av.Packet]:
433 """Return items without consuming from the iterator."""
437 yield from self._buffer
439 self._buffer.append(packet)
444 """Validate ordering of timestamps for packets in a stream."""
446 def __init__(self, inv_video_time_base: int, inv_audio_time_base: int) ->
None:
447 """Initialize the TimestampValidator."""
449 self._last_dts: dict[av.stream.Stream, int | float] = defaultdict(
459 inv_video_time_base, inv_audio_time_base
463 """Validate the packet timestamp based on ordering within the stream."""
465 if packet.dts
is None:
468 f
"No dts in {MAX_MISSING_DTS+1} consecutive packets"
474 prev_dts = self._last_dts[packet.stream]
475 if abs(prev_dts - packet.dts) > self.
_max_dts_gap_max_dts_gap
and prev_dts != NEGATIVE_INF:
477 f
"Timestamp discontinuity detected: last dts = {prev_dts}, dts ="
480 if packet.dts <= prev_dts:
482 self._last_dts[packet.stream] = packet.dts
487 """Return true if the packet is a keyframe."""
488 return packet.is_keyframe
492 packets: Iterator[av.Packet], audio_stream: Any
494 """Return the aac_adtstoasc bitstream filter if ADTS AAC is detected."""
497 for count, packet
in enumerate(packets):
498 if count >= PACKETS_TO_WAIT_FOR_AUDIO:
500 _LOGGER.warning(
"Audio stream not found")
502 if packet.stream == audio_stream:
504 if audio_stream.codec.name ==
"aac" and packet.size > 2:
505 with memoryview(packet)
as packet_view:
506 if packet_view[0] == 0xFF
and packet_view[1] & 0xF0 == 0xF0:
508 "ADTS AAC detected. Adding aac_adtstoaac bitstream filter"
510 return "aac_adtstoasc"
517 pyav_options: dict[str, str],
518 stream_settings: StreamSettings,
519 stream_state: StreamState,
520 keyframe_converter: KeyFrameConverter,
523 """Handle consuming streams."""
525 if av.library_versions[
"libavformat"][0] >= 59
and "stimeout" in pyav_options:
527 pyav_options[
"timeout"] = pyav_options[
"stimeout"]
528 del pyav_options[
"stimeout"]
530 container = av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT)
531 except av.FFmpegError
as err:
533 f
"Error opening stream ({redact_av_error_string(err)})"
536 video_stream = container.streams.video[0]
537 except (KeyError, IndexError)
as ex:
539 keyframe_converter.create_codec_context(codec_context=video_stream.codec_context)
541 audio_stream = container.streams.audio[0]
542 except (KeyError, IndexError):
544 if audio_stream
and audio_stream.name
not in AUDIO_CODECS:
547 if audio_stream
and audio_stream.profile
is None:
550 if container.format.name ==
"hls":
551 for field
in fields(StreamSettings):
555 getattr(STREAM_SETTINGS_NON_LL_HLS, field.name),
557 stream_state.diagnostics.set_value(
"container_format", container.format.name)
558 stream_state.diagnostics.set_value(
"video_codec", video_stream.name)
560 stream_state.diagnostics.set_value(
"audio_codec", audio_stream.name)
563 int(1 / video_stream.time_base),
564 int(1 / audio_stream.time_base)
if audio_stream
else 1,
567 filter(dts_validator.is_valid, container.demux((video_stream, audio_stream)))
570 def is_video(packet: av.Packet) -> Any:
571 """Return true if the packet is for the video stream."""
572 return packet.stream.type ==
"video"
585 first_keyframe = next(
586 filter(
lambda pkt:
is_keyframe(pkt)
and is_video(pkt), container_packets)
592 next_video_packet = next(filter(is_video, container_packets.peek()))
597 start_dts = next_video_packet.dts - (next_video_packet.duration
or 1)
598 first_keyframe.dts = first_keyframe.pts = start_dts
599 except StreamWorkerError:
602 except StopIteration
as ex:
605 except av.FFmpegError
as ex:
608 f
"Error demuxing stream while finding first packet ({redact_av_error_string(ex)})"
619 muxer.reset(start_dts)
622 muxer.mux_packet(first_keyframe)
624 with contextlib.closing(container), contextlib.closing(muxer):
625 while not quit_event.is_set():
627 packet = next(container_packets)
628 except StreamWorkerError:
630 except StopIteration
as ex:
632 except av.FFmpegError
as ex:
634 f
"Error demuxing stream ({redact_av_error_string(ex)})"
637 muxer.mux_packet(packet)
639 if packet.is_keyframe
and is_video(packet):
640 keyframe_converter.stash_keyframe_packet(packet)
Generator[av.Packet] peek(self)
av.Packet _pop_buffer(self)
None __init__(self, Iterator[av.Packet] iterator)
None mux_packet(self, av.Packet packet)
None flush(self, av.Packet packet, bool last_part)
None __init__(self, HomeAssistant hass, av.VideoStream video_stream, av.audio.AudioStream|None audio_stream, str|None audio_bsf, StreamState stream_state, StreamSettings stream_settings)
tuple[ av.container.OutputContainer, av.VideoStream, av.audio.AudioStream|None,] make_new_av(self, BytesIO memory_file, int sequence, av.VideoStream input_vstream, av.audio.AudioStream|None input_astream)
None reset(self, int video_dts)
None create_segment(self)
None check_flush_part(self, av.Packet packet)
None __init__(self, HomeAssistant hass, Callable[[], Mapping[str, StreamOutput]] outputs_callback, Diagnostics diagnostics)
Diagnostics diagnostics(self)
list[StreamOutput] outputs(self)
None __init__(self, int inv_video_time_base, int inv_audio_time_base)
bool is_valid(self, av.Packet packet)
web.Response get(self, web.Request request, str config_key)
bytes read_init(BufferedIOBase bytes_io)
str|None get_audio_bitstream_filter(Iterator[av.Packet] packets, Any audio_stream)
str redact_av_error_string(av.FFmpegError err)
Any is_keyframe(av.Packet packet)
None stream_worker(str source, dict[str, str] pyav_options, StreamSettings stream_settings, StreamState stream_state, KeyFrameConverter keyframe_converter, Event quit_event)
str redact_credentials(str url)