Home Assistant Unofficial Reference 2024.12.1
core.py
Go to the documentation of this file.
1 """Provides core stream functionality."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import deque
7 from collections.abc import Callable, Coroutine, Iterable
8 from dataclasses import dataclass, field
9 import datetime
10 from enum import IntEnum
11 import logging
12 from typing import TYPE_CHECKING, Any, cast
13 
14 from aiohttp import web
15 import numpy as np
16 
17 from homeassistant.components.http import KEY_HASS, HomeAssistantView
18 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
19 from homeassistant.helpers.event import async_call_later
20 from homeassistant.util.decorator import Registry
21 
22 from .const import (
23  ATTR_STREAMS,
24  DOMAIN,
25  SEGMENT_DURATION_ADJUSTER,
26  TARGET_SEGMENT_DURATION_NON_LL_HLS,
27 )
28 
29 if TYPE_CHECKING:
30  from av import Packet, VideoCodecContext
31 
32  from homeassistant.components.camera import DynamicStreamSettings
33 
34  from . import Stream
35 
36 _LOGGER = logging.getLogger(__name__)
37 
38 PROVIDERS: Registry[str, type[StreamOutput]] = Registry()
39 
40 
41 class Orientation(IntEnum):
42  """Orientations for stream transforms. These are based on EXIF orientation tags."""
43 
44  NO_TRANSFORM = 1
45  MIRROR = 2
46  ROTATE_180 = 3
47  FLIP = 4
48  ROTATE_LEFT_AND_FLIP = 5
49  ROTATE_LEFT = 6
50  ROTATE_RIGHT_AND_FLIP = 7
51  ROTATE_RIGHT = 8
52 
53 
54 @dataclass(slots=True)
56  """Stream settings."""
57 
58  ll_hls: bool
59  min_segment_duration: float
60  part_target_duration: float
61  hls_advance_part_limit: int
62  hls_part_timeout: float
63 
64 
65 STREAM_SETTINGS_NON_LL_HLS = StreamSettings(
66  ll_hls=False,
67  min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER,
68  part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
69  hls_advance_part_limit=3,
70  hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
71 )
72 
73 
74 @dataclass(slots=True)
75 class Part:
76  """Represent a segment part."""
77 
78  duration: float
79  has_keyframe: bool
80  # video data (moof+mdat)
81  data: bytes
82 
83 
84 @dataclass(slots=True)
85 class Segment:
86  """Represent a segment."""
87 
88  sequence: int
89  # the init of the mp4 the segment is based on
90  init: bytes
91  # For detecting discontinuities across stream restarts
92  stream_id: int
93  start_time: datetime.datetime
94  _stream_outputs: Iterable[StreamOutput]
95  duration: float = 0
96  parts: list[Part] = field(default_factory=list)
97  # Store text of this segment's hls playlist for reuse
98  # Use list[str] for easy appends
99  hls_playlist_template: list[str] = field(default_factory=list)
100  hls_playlist_parts: list[str] = field(default_factory=list)
101  # Number of playlist parts rendered so far
102  hls_num_parts_rendered: int = 0
103  # Set to true when all the parts are rendered
104  hls_playlist_complete: bool = False
105 
106  def __post_init__(self) -> None:
107  """Run after init."""
108  for output in self._stream_outputs:
109  output.put(self)
110 
111  @property
112  def complete(self) -> bool:
113  """Return whether the Segment is complete."""
114  return self.durationduration > 0
115 
116  @property
117  def data_size_with_init(self) -> int:
118  """Return the size of all part data + init in bytes."""
119  return len(self.init) + self.data_sizedata_size
120 
121  @property
122  def data_size(self) -> int:
123  """Return the size of all part data without init in bytes."""
124  return sum(len(part.data) for part in self.parts)
125 
126  @callback
128  self,
129  part: Part,
130  duration: float,
131  ) -> None:
132  """Add a part to the Segment.
133 
134  Duration is non zero only for the last part.
135  """
136  self.parts.append(part)
137  self.durationduration = duration
138  for output in self._stream_outputs:
139  output.part_put()
140 
141  def get_data(self) -> bytes:
142  """Return reconstructed data for all parts as bytes, without init."""
143  return b"".join([part.data for part in self.parts])
144 
145  def _render_hls_template(self, last_stream_id: int, render_parts: bool) -> str:
146  """Render the HLS playlist section for the Segment.
147 
148  The Segment may still be in progress.
149  This method stores intermediate data in hls_playlist_parts,
150  hls_num_parts_rendered, and hls_playlist_complete to avoid redoing
151  work on subsequent calls.
152  """
153  if self.hls_playlist_completehls_playlist_complete:
154  return self.hls_playlist_templatehls_playlist_template[0]
155  if not self.hls_playlist_templatehls_playlist_template:
156  # Logically EXT-X-DISCONTINUITY makes sense above the parts, but Apple's
157  # media stream validator seems to only want it before the segment
158  if last_stream_id != self.stream_id:
159  self.hls_playlist_templatehls_playlist_template.append("#EXT-X-DISCONTINUITY")
160  # This is a placeholder where the rendered parts will be inserted
161  self.hls_playlist_templatehls_playlist_template.append("{}")
162  if render_parts:
163  for part_num, part in enumerate(
164  self.parts[self.hls_num_parts_renderedhls_num_parts_rendered :], self.hls_num_parts_renderedhls_num_parts_rendered
165  ):
166  self.hls_playlist_partshls_playlist_parts.append(
167  f"#EXT-X-PART:DURATION={part.duration:.3f},URI="
168  f'"./segment/{self.sequence}.{part_num}.m4s"'
169  f'{",INDEPENDENT=YES" if part.has_keyframe else ""}'
170  )
171  if self.completecomplete:
172  # Construct the final playlist_template. The placeholder will share a
173  # line with the first element to avoid an extra newline when we don't
174  # render any parts. Append an empty string to create a trailing newline
175  # when we do render parts
176  self.hls_playlist_partshls_playlist_parts.append("")
177  self.hls_playlist_templatehls_playlist_template = (
178  [] if last_stream_id == self.stream_id else ["#EXT-X-DISCONTINUITY"]
179  )
180  # Add the remaining segment metadata
181  # The placeholder goes on the same line as the next element
182  self.hls_playlist_templatehls_playlist_template.extend(
183  [
184  "{}#EXT-X-PROGRAM-DATE-TIME:"
185  + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
186  + "Z",
187  f"#EXTINF:{self.duration:.3f},\n./segment/{self.sequence}.m4s",
188  ]
189  )
190 
191  # Store intermediate playlist data in member variables for reuse
192  self.hls_playlist_templatehls_playlist_template = ["\n".join(self.hls_playlist_templatehls_playlist_template)]
193  # lstrip discards extra preceding newline in case first render was empty
194  self.hls_playlist_partshls_playlist_parts = ["\n".join(self.hls_playlist_partshls_playlist_parts).lstrip()]
195  self.hls_num_parts_renderedhls_num_parts_rendered = len(self.parts)
196  self.hls_playlist_completehls_playlist_complete = self.completecomplete
197 
198  return self.hls_playlist_templatehls_playlist_template[0]
199 
201  self, last_stream_id: int, render_parts: bool, add_hint: bool
202  ) -> str:
203  """Render the HLS playlist section for the Segment including a hint if requested."""
204  playlist_template = self._render_hls_template_render_hls_template(last_stream_id, render_parts)
205  playlist = playlist_template.format(
206  self.hls_playlist_partshls_playlist_parts[0] if render_parts else ""
207  )
208  if not add_hint:
209  return playlist
210  # Preload hints help save round trips by informing the client about the
211  # next part. The next part will usually be in this segment but will be
212  # first part of the next segment if this segment is already complete.
213  if self.completecomplete: # Next part belongs to next segment
214  sequence = self.sequence + 1
215  part_num = 0
216  else: # Next part is in the same segment
217  sequence = self.sequence
218  part_num = len(self.parts)
219  hint = (
220  f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.{part_num}.m4s"'
221  )
222  return (playlist + "\n" + hint) if playlist else hint
223 
224 
225 class IdleTimer:
226  """Invoke a callback after an inactivity timeout.
227 
228  The IdleTimer invokes the callback after some timeout has passed. The awake() method
229  resets the internal alarm, extending the inactivity time.
230  """
231 
232  def __init__(
233  self,
234  hass: HomeAssistant,
235  timeout: int,
236  idle_callback: Callable[[], Coroutine[Any, Any, None]],
237  ) -> None:
238  """Initialize IdleTimer."""
239  self._hass_hass = hass
240  self._timeout_timeout = timeout
241  self._callback_callback = idle_callback
242  self._unsub_unsub: CALLBACK_TYPE | None = None
243  self.idleidle = False
244 
245  def start(self) -> None:
246  """Start the idle timer if not already started."""
247  self.idleidle = False
248  if self._unsub_unsub is None:
249  self._unsub_unsub = async_call_later(self._hass_hass, self._timeout_timeout, self.firefire)
250 
251  def awake(self) -> None:
252  """Keep the idle time alive by resetting the timeout."""
253  self.idleidle = False
254  # Reset idle timeout
255  self.clearclear()
256  self._unsub_unsub = async_call_later(self._hass_hass, self._timeout_timeout, self.firefire)
257 
258  def clear(self) -> None:
259  """Clear and disable the timer if it has not already fired."""
260  if self._unsub_unsub is not None:
261  self._unsub_unsub()
262 
263  @callback
264  def fire(self, _now: datetime.datetime) -> None:
265  """Invoke the idle timeout callback, called when the alarm fires."""
266  self.idleidle = True
267  self._unsub_unsub = None
268  self._hass_hass.async_create_task(self._callback_callback())
269 
270 
272  """Represents a stream output."""
273 
274  def __init__(
275  self,
276  hass: HomeAssistant,
277  idle_timer: IdleTimer,
278  stream_settings: StreamSettings,
279  dynamic_stream_settings: DynamicStreamSettings,
280  deque_maxlen: int | None = None,
281  ) -> None:
282  """Initialize a stream output."""
283  self._hass_hass = hass
284  self.idle_timeridle_timer = idle_timer
285  self.stream_settingsstream_settings = stream_settings
286  self.dynamic_stream_settingsdynamic_stream_settings = dynamic_stream_settings
287  self._event_event = asyncio.Event()
288  self._part_event_part_event = asyncio.Event()
289  self._segments: deque[Segment] = deque(maxlen=deque_maxlen)
290 
291  @property
292  def name(self) -> str | None:
293  """Return provider name."""
294  return None
295 
296  @property
297  def idle(self) -> bool:
298  """Return True if the output is idle."""
299  return self.idle_timeridle_timer.idle
300 
301  @property
302  def last_sequence(self) -> int:
303  """Return the last sequence number without iterating."""
304  if self._segments:
305  return self._segments[-1].sequence
306  return -1
307 
308  @property
309  def sequences(self) -> list[int]:
310  """Return current sequence from segments."""
311  return [s.sequence for s in self._segments]
312 
313  @property
314  def last_segment(self) -> Segment | None:
315  """Return the last segment without iterating."""
316  if self._segments:
317  return self._segments[-1]
318  return None
319 
320  def get_segment(self, sequence: int) -> Segment | None:
321  """Retrieve a specific segment."""
322  # Most hits will come in the most recent segments, so iterate reversed
323  for segment in reversed(self._segments):
324  if segment.sequence == sequence:
325  return segment
326  return None
327 
328  def get_segments(self) -> deque[Segment]:
329  """Retrieve all segments."""
330  return self._segments
331 
332  async def part_recv(self, timeout: float | None = None) -> bool:
333  """Wait for an event signalling the latest part segment."""
334  try:
335  async with asyncio.timeout(timeout):
336  await self._part_event_part_event.wait()
337  except TimeoutError:
338  return False
339  return True
340 
341  def part_put(self) -> None:
342  """Set event signalling the latest part segment."""
343  # Start idle timeout when we start receiving data
344  self._part_event_part_event.set()
345  self._part_event_part_event.clear()
346 
347  async def recv(self) -> bool:
348  """Wait for the latest segment."""
349  await self._event_event.wait()
350  return self.last_segmentlast_segment is not None
351 
352  def put(self, segment: Segment) -> None:
353  """Store output."""
354  self._hass_hass.loop.call_soon_threadsafe(self._async_put_async_put, segment)
355 
356  @callback
357  def _async_put(self, segment: Segment) -> None:
358  """Store output from event loop."""
359  # Start idle timeout when we start receiving data
360  self.idle_timeridle_timer.start()
361  self._segments.append(segment)
362  self._event_event.set()
363  self._event_event.clear()
364 
365  def cleanup(self) -> None:
366  """Handle cleanup."""
367  self._event_event.set()
368  self.idle_timeridle_timer.clear()
369 
370 
371 class StreamView(HomeAssistantView):
372  """Base StreamView.
373 
374  For implementation of a new stream format, define `url` and `name`
375  attributes, and implement `handle` method in a child class.
376  """
377 
378  requires_auth = False
379 
380  async def get(
381  self, request: web.Request, token: str, sequence: str = "", part_num: str = ""
382  ) -> web.StreamResponse:
383  """Start a GET request."""
384  hass = request.app[KEY_HASS]
385 
386  stream = next(
387  (s for s in hass.data[DOMAIN][ATTR_STREAMS] if s.access_token == token),
388  None,
389  )
390 
391  if not stream:
392  raise web.HTTPNotFound
393 
394  # Start worker if not already started
395  await stream.start()
396 
397  return await self.handlehandle(request, stream, sequence, part_num)
398 
399  async def handle(
400  self, request: web.Request, stream: Stream, sequence: str, part_num: str
401  ) -> web.StreamResponse:
402  """Handle the stream request."""
403  raise NotImplementedError
404 
405 
406 TRANSFORM_IMAGE_FUNCTION = (
407  lambda image: image, # Unused
408  lambda image: image, # No transform
409  lambda image: np.fliplr(image).copy(), # Mirror
410  lambda image: np.rot90(image, 2).copy(), # Rotate 180
411  lambda image: np.flipud(image).copy(), # Flip
412  lambda image: np.flipud(np.rot90(image)).copy(), # Rotate left and flip
413  lambda image: np.rot90(image).copy(), # Rotate left
414  lambda image: np.flipud(np.rot90(image, -1)).copy(), # Rotate right and flip
415  lambda image: np.rot90(image, -1).copy(), # Rotate right
416 )
417 
418 
420  """Enables generating and getting an image from the last keyframe seen in the stream.
421 
422  An overview of the thread and state interaction:
423  the worker thread sets a packet
424  get_image is called from the main asyncio loop
425  get_image schedules _generate_image in an executor thread
426  _generate_image will try to create an image from the packet
427  _generate_image will clear the packet, so there will only be one attempt per packet
428  If successful, self._image will be updated and returned by get_image
429  If unsuccessful, get_image will return the previous image
430  """
431 
432  def __init__(
433  self,
434  hass: HomeAssistant,
435  stream_settings: StreamSettings,
436  dynamic_stream_settings: DynamicStreamSettings,
437  ) -> None:
438  """Initialize."""
439 
440  # Keep import here so that we can import stream integration
441  # without installing reqs
442  # pylint: disable-next=import-outside-toplevel
443  from homeassistant.components.camera.img_util import TurboJPEGSingleton
444 
445  self._packet_packet: Packet | None = None
446  self._event: asyncio.Event = asyncio.Event()
447  self._hass_hass = hass
448  self._image_image: bytes | None = None
449  self._turbojpeg_turbojpeg = TurboJPEGSingleton.instance()
450  self._lock_lock = asyncio.Lock()
451  self._codec_context_codec_context: VideoCodecContext | None = None
452  self._stream_settings_stream_settings = stream_settings
453  self._dynamic_stream_settings_dynamic_stream_settings = dynamic_stream_settings
454 
455  def stash_keyframe_packet(self, packet: Packet) -> None:
456  """Store the keyframe and set the asyncio.Event from the event loop.
457 
458  This is called from the worker thread.
459  """
460  self._packet_packet = packet
461  self._hass_hass.loop.call_soon_threadsafe(self._event.set)
462 
463  def create_codec_context(self, codec_context: VideoCodecContext) -> None:
464  """Create a codec context to be used for decoding the keyframes.
465 
466  This is run by the worker thread and will only be called once per worker.
467  """
468 
469  if self._codec_context_codec_context:
470  return
471 
472  # Keep import here so that we can import stream integration without
473  # installing reqs
474  # pylint: disable-next=import-outside-toplevel
475  from av import CodecContext
476 
477  self._codec_context_codec_context = cast(
478  "VideoCodecContext", CodecContext.create(codec_context.name, "r")
479  )
480  self._codec_context_codec_context.extradata = codec_context.extradata
481  self._codec_context_codec_context.skip_frame = "NONKEY"
482  self._codec_context_codec_context.thread_type = "NONE"
483 
484  @staticmethod
485  def transform_image(image: np.ndarray, orientation: int) -> np.ndarray:
486  """Transform image to a given orientation."""
487  return TRANSFORM_IMAGE_FUNCTION[orientation](image)
488 
489  def _generate_image(self, width: int | None, height: int | None) -> None:
490  """Generate the keyframe image.
491 
492  This is run in an executor thread, but since it is called within an
493  the asyncio lock from the main thread, there will only be one entry
494  at a time per instance.
495  """
496 
497  if not (self._turbojpeg_turbojpeg and self._packet_packet and self._codec_context_codec_context):
498  return
499  packet = self._packet_packet
500  self._packet_packet = None
501  for _ in range(2): # Retry once if codec context needs to be flushed
502  try:
503  # decode packet (flush afterwards)
504  frames = self._codec_context_codec_context.decode(packet)
505  for _i in range(2):
506  if frames:
507  break
508  frames = self._codec_context_codec_context.decode(None)
509  break
510  except EOFError:
511  _LOGGER.debug("Codec context needs flushing")
512  self._codec_context_codec_context.flush_buffers()
513  else:
514  _LOGGER.debug("Unable to decode keyframe")
515  return
516  if frames:
517  frame = frames[0]
518  if width and height:
519  if self._dynamic_stream_settings_dynamic_stream_settings.orientation >= 5:
520  frame = frame.reformat(width=height, height=width)
521  else:
522  frame = frame.reformat(width=width, height=height)
523  bgr_array = self.transform_imagetransform_image(
524  frame.to_ndarray(format="bgr24"),
525  self._dynamic_stream_settings_dynamic_stream_settings.orientation,
526  )
527  self._image_image = bytes(self._turbojpeg_turbojpeg.encode(bgr_array))
528 
529  async def async_get_image(
530  self,
531  width: int | None = None,
532  height: int | None = None,
533  wait_for_next_keyframe: bool = False,
534  ) -> bytes | None:
535  """Fetch an image from the Stream and return it as a jpeg in bytes."""
536 
537  # Use a lock to ensure only one thread is working on the keyframe at a time
538  if wait_for_next_keyframe:
539  self._event.clear()
540  await self._event.wait()
541  async with self._lock_lock:
542  await self._hass_hass.async_add_executor_job(self._generate_image_generate_image, width, height)
543  return self._image_image
None __init__(self, HomeAssistant hass, int timeout, Callable[[], Coroutine[Any, Any, None]] idle_callback)
Definition: core.py:237
None fire(self, datetime.datetime _now)
Definition: core.py:264
None create_codec_context(self, VideoCodecContext codec_context)
Definition: core.py:463
bytes|None async_get_image(self, int|None width=None, int|None height=None, bool wait_for_next_keyframe=False)
Definition: core.py:534
None __init__(self, HomeAssistant hass, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings)
Definition: core.py:437
np.ndarray transform_image(np.ndarray image, int orientation)
Definition: core.py:485
None _generate_image(self, int|None width, int|None height)
Definition: core.py:489
None stash_keyframe_packet(self, Packet packet)
Definition: core.py:455
None async_add_part(self, Part part, float duration)
Definition: core.py:131
str render_hls(self, int last_stream_id, bool render_parts, bool add_hint)
Definition: core.py:202
str _render_hls_template(self, int last_stream_id, bool render_parts)
Definition: core.py:145
None put(self, Segment segment)
Definition: core.py:352
Segment|None get_segment(self, int sequence)
Definition: core.py:320
None _async_put(self, Segment segment)
Definition: core.py:357
bool part_recv(self, float|None timeout=None)
Definition: core.py:332
None __init__(self, HomeAssistant hass, IdleTimer idle_timer, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings, int|None deque_maxlen=None)
Definition: core.py:281
web.StreamResponse handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: core.py:401
web.StreamResponse get(self, web.Request request, str token, str sequence="", str part_num="")
Definition: core.py:382
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597