1 """Provides core stream functionality."""
3 from __future__
import annotations
6 from collections
import deque
7 from collections.abc
import Callable, Coroutine, Iterable
8 from dataclasses
import dataclass, field
10 from enum
import IntEnum
12 from typing
import TYPE_CHECKING, Any, cast
14 from aiohttp
import web
25 SEGMENT_DURATION_ADJUSTER,
26 TARGET_SEGMENT_DURATION_NON_LL_HLS,
30 from av
import Packet, VideoCodecContext
36 _LOGGER = logging.getLogger(__name__)
38 PROVIDERS: Registry[str, type[StreamOutput]] =
Registry()
42 """Orientations for stream transforms. These are based on EXIF orientation tags."""
48 ROTATE_LEFT_AND_FLIP = 5
50 ROTATE_RIGHT_AND_FLIP = 7
54 @dataclass(slots=True)
56 """Stream settings."""
59 min_segment_duration: float
60 part_target_duration: float
61 hls_advance_part_limit: int
62 hls_part_timeout: float
67 min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER,
68 part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
69 hls_advance_part_limit=3,
70 hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
74 @dataclass(slots=True)
76 """Represent a segment part."""
84 @dataclass(slots=True)
86 """Represent a segment."""
93 start_time: datetime.datetime
94 _stream_outputs: Iterable[StreamOutput]
96 parts: list[Part] = field(default_factory=list)
99 hls_playlist_template: list[str] = field(default_factory=list)
100 hls_playlist_parts: list[str] = field(default_factory=list)
102 hls_num_parts_rendered: int = 0
104 hls_playlist_complete: bool =
False
107 """Run after init."""
108 for output
in self._stream_outputs:
113 """Return whether the Segment is complete."""
118 """Return the size of all part data + init in bytes."""
119 return len(self.init) + self.
data_sizedata_size
123 """Return the size of all part data without init in bytes."""
124 return sum(len(part.data)
for part
in self.parts)
132 """Add a part to the Segment.
134 Duration is non zero only for the last part.
136 self.parts.append(part)
138 for output
in self._stream_outputs:
142 """Return reconstructed data for all parts as bytes, without init."""
143 return b
"".join([part.data
for part
in self.parts])
146 """Render the HLS playlist section for the Segment.
148 The Segment may still be in progress.
149 This method stores intermediate data in hls_playlist_parts,
150 hls_num_parts_rendered, and hls_playlist_complete to avoid redoing
151 work on subsequent calls.
158 if last_stream_id != self.stream_id:
163 for part_num, part
in enumerate(
167 f
"#EXT-X-PART:DURATION={part.duration:.3f},URI="
168 f
'"./segment/{self.sequence}.{part_num}.m4s"'
169 f
'{",INDEPENDENT=YES" if part.has_keyframe else ""}'
178 []
if last_stream_id == self.stream_id
else [
"#EXT-X-DISCONTINUITY"]
184 "{}#EXT-X-PROGRAM-DATE-TIME:"
185 + self.start_time.strftime(
"%Y-%m-%dT%H:%M:%S.%f")[:-3]
187 f
"#EXTINF:{self.duration:.3f},\n./segment/{self.sequence}.m4s",
201 self, last_stream_id: int, render_parts: bool, add_hint: bool
203 """Render the HLS playlist section for the Segment including a hint if requested."""
205 playlist = playlist_template.format(
214 sequence = self.sequence + 1
217 sequence = self.sequence
218 part_num = len(self.parts)
220 f
'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.{part_num}.m4s"'
222 return (playlist +
"\n" + hint)
if playlist
else hint
226 """Invoke a callback after an inactivity timeout.
228 The IdleTimer invokes the callback after some timeout has passed. The awake() method
229 resets the internal alarm, extending the inactivity time.
236 idle_callback: Callable[[], Coroutine[Any, Any,
None]],
238 """Initialize IdleTimer."""
242 self.
_unsub_unsub: CALLBACK_TYPE |
None =
None
246 """Start the idle timer if not already started."""
247 self.
idleidle =
False
248 if self.
_unsub_unsub
is None:
252 """Keep the idle time alive by resetting the timeout."""
253 self.
idleidle =
False
259 """Clear and disable the timer if it has not already fired."""
260 if self.
_unsub_unsub
is not None:
264 def fire(self, _now: datetime.datetime) ->
None:
265 """Invoke the idle timeout callback, called when the alarm fires."""
272 """Represents a stream output."""
277 idle_timer: IdleTimer,
278 stream_settings: StreamSettings,
279 dynamic_stream_settings: DynamicStreamSettings,
280 deque_maxlen: int |
None =
None,
282 """Initialize a stream output."""
289 self._segments: deque[Segment] = deque(maxlen=deque_maxlen)
293 """Return provider name."""
298 """Return True if the output is idle."""
303 """Return the last sequence number without iterating."""
305 return self._segments[-1].sequence
310 """Return current sequence from segments."""
311 return [s.sequence
for s
in self._segments]
315 """Return the last segment without iterating."""
317 return self._segments[-1]
321 """Retrieve a specific segment."""
323 for segment
in reversed(self._segments):
324 if segment.sequence == sequence:
329 """Retrieve all segments."""
330 return self._segments
332 async
def part_recv(self, timeout: float |
None =
None) -> bool:
333 """Wait for an event signalling the latest part segment."""
335 async
with asyncio.timeout(timeout):
342 """Set event signalling the latest part segment."""
348 """Wait for the latest segment."""
349 await self.
_event_event.wait()
352 def put(self, segment: Segment) ->
None:
354 self.
_hass_hass.loop.call_soon_threadsafe(self.
_async_put_async_put, segment)
358 """Store output from event loop."""
361 self._segments.append(segment)
366 """Handle cleanup."""
374 For implementation of a new stream format, define `url` and `name`
375 attributes, and implement `handle` method in a child class.
378 requires_auth =
False
381 self, request: web.Request, token: str, sequence: str =
"", part_num: str =
""
382 ) -> web.StreamResponse:
383 """Start a GET request."""
384 hass = request.app[KEY_HASS]
387 (s
for s
in hass.data[DOMAIN][ATTR_STREAMS]
if s.access_token == token),
392 raise web.HTTPNotFound
397 return await self.
handlehandle(request, stream, sequence, part_num)
400 self, request: web.Request, stream: Stream, sequence: str, part_num: str
401 ) -> web.StreamResponse:
402 """Handle the stream request."""
403 raise NotImplementedError
406 TRANSFORM_IMAGE_FUNCTION = (
409 lambda image: np.fliplr(image).copy(),
410 lambda image: np.rot90(image, 2).copy(),
411 lambda image: np.flipud(image).copy(),
412 lambda image: np.flipud(np.rot90(image)).copy(),
413 lambda image: np.rot90(image).copy(),
414 lambda image: np.flipud(np.rot90(image, -1)).copy(),
415 lambda image: np.rot90(image, -1).copy(),
420 """Enables generating and getting an image from the last keyframe seen in the stream.
422 An overview of the thread and state interaction:
423 the worker thread sets a packet
424 get_image is called from the main asyncio loop
425 get_image schedules _generate_image in an executor thread
426 _generate_image will try to create an image from the packet
427 _generate_image will clear the packet, so there will only be one attempt per packet
428 If successful, self._image will be updated and returned by get_image
429 If unsuccessful, get_image will return the previous image
435 stream_settings: StreamSettings,
436 dynamic_stream_settings: DynamicStreamSettings,
445 self.
_packet_packet: Packet |
None =
None
446 self._event: asyncio.Event = asyncio.Event()
448 self.
_image_image: bytes |
None =
None
451 self.
_codec_context_codec_context: VideoCodecContext |
None =
None
456 """Store the keyframe and set the asyncio.Event from the event loop.
458 This is called from the worker thread.
461 self.
_hass_hass.loop.call_soon_threadsafe(self._event.set)
464 """Create a codec context to be used for decoding the keyframes.
466 This is run by the worker thread and will only be called once per worker.
475 from av
import CodecContext
478 "VideoCodecContext", CodecContext.create(codec_context.name,
"r")
480 self.
_codec_context_codec_context.extradata = codec_context.extradata
486 """Transform image to a given orientation."""
487 return TRANSFORM_IMAGE_FUNCTION[orientation](image)
490 """Generate the keyframe image.
492 This is run in an executor thread, but since it is called within an
493 the asyncio lock from the main thread, there will only be one entry
494 at a time per instance.
511 _LOGGER.debug(
"Codec context needs flushing")
514 _LOGGER.debug(
"Unable to decode keyframe")
520 frame = frame.reformat(width=height, height=width)
522 frame = frame.reformat(width=width, height=height)
524 frame.to_ndarray(format=
"bgr24"),
531 width: int |
None =
None,
532 height: int |
None =
None,
533 wait_for_next_keyframe: bool =
False,
535 """Fetch an image from the Stream and return it as a jpeg in bytes."""
538 if wait_for_next_keyframe:
540 await self._event.wait()
541 async
with self.
_lock_lock:
542 await self.
_hass_hass.async_add_executor_job(self.
_generate_image_generate_image, width, height)
None __init__(self, HomeAssistant hass, int timeout, Callable[[], Coroutine[Any, Any, None]] idle_callback)
None fire(self, datetime.datetime _now)
None create_codec_context(self, VideoCodecContext codec_context)
bytes|None async_get_image(self, int|None width=None, int|None height=None, bool wait_for_next_keyframe=False)
None __init__(self, HomeAssistant hass, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings)
np.ndarray transform_image(np.ndarray image, int orientation)
None _generate_image(self, int|None width, int|None height)
None stash_keyframe_packet(self, Packet packet)
None async_add_part(self, Part part, float duration)
int data_size_with_init(self)
str render_hls(self, int last_stream_id, bool render_parts, bool add_hint)
str _render_hls_template(self, int last_stream_id, bool render_parts)
None put(self, Segment segment)
Segment|None last_segment(self)
Segment|None get_segment(self, int sequence)
deque[Segment] get_segments(self)
None _async_put(self, Segment segment)
list[int] sequences(self)
bool part_recv(self, float|None timeout=None)
None __init__(self, HomeAssistant hass, IdleTimer idle_timer, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings, int|None deque_maxlen=None)
web.StreamResponse handle(self, web.Request request, Stream stream, str sequence, str part_num)
web.StreamResponse get(self, web.Request request, str token, str sequence="", str part_num="")
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)