Home Assistant Unofficial Reference 2024.12.1
recorder.py
Go to the documentation of this file.
1 """Provide functionality to record stream."""
2 
3 from __future__ import annotations
4 
5 from collections import deque
6 from io import DEFAULT_BUFFER_SIZE, BytesIO
7 import logging
8 import os
9 from typing import TYPE_CHECKING
10 
11 import av
12 import av.container
13 
14 from homeassistant.core import HomeAssistant, callback
15 
16 from .const import (
17  RECORDER_CONTAINER_FORMAT,
18  RECORDER_PROVIDER,
19  SEGMENT_CONTAINER_FORMAT,
20 )
21 from .core import PROVIDERS, IdleTimer, Segment, StreamOutput, StreamSettings
22 from .fmp4utils import read_init, transform_init
23 
24 if TYPE_CHECKING:
25  from homeassistant.components.camera import DynamicStreamSettings
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 
30 @callback
31 def async_setup_recorder(hass: HomeAssistant) -> None:
32  """Only here so Provider Registry works."""
33 
34 
35 @PROVIDERS.register(RECORDER_PROVIDER)
37  """Represents the Recorder Output format."""
38 
39  def __init__(
40  self,
41  hass: HomeAssistant,
42  idle_timer: IdleTimer,
43  stream_settings: StreamSettings,
44  dynamic_stream_settings: DynamicStreamSettings,
45  ) -> None:
46  """Initialize recorder output."""
47  super().__init__(hass, idle_timer, stream_settings, dynamic_stream_settings)
48  self.video_path: str
49 
50  @property
51  def name(self) -> str:
52  """Return provider name."""
53  return RECORDER_PROVIDER
54 
55  def prepend(self, segments: list[Segment]) -> None:
56  """Prepend segments to existing list."""
57  self._segments.extendleft(reversed(segments))
58 
59  def cleanup(self) -> None:
60  """Handle cleanup."""
61  self.idle_timeridle_timer.idle = True
62  super().cleanup()
63 
64  async def async_record(self) -> None:
65  """Handle saving stream."""
66 
67  os.makedirs(os.path.dirname(self.video_path), exist_ok=True)
68 
69  pts_adjuster: dict[str, int | None] = {"video": None, "audio": None}
70  output: av.container.OutputContainer | None = None
71  output_v = None
72  output_a = None
73 
74  last_stream_id = -1
75  # The running duration of processed segments. Note that this is in av.time_base
76  # units which seem to be defined inversely to how stream time_bases are defined
77  running_duration = 0
78 
79  last_sequence = float("-inf")
80 
81  def write_segment(segment: Segment) -> None:
82  """Write a segment to output."""
83  # fmt: off
84  nonlocal output, output_v, output_a, last_stream_id, running_duration, last_sequence
85  # fmt: on
86  # Because the stream_worker is in a different thread from the record service,
87  # the lookback segments may still have some overlap with the recorder segments
88  if segment.sequence <= last_sequence:
89  return
90  last_sequence = segment.sequence
91 
92  # Open segment
93  source = av.open(
94  BytesIO(segment.init + segment.get_data()),
95  "r",
96  format=SEGMENT_CONTAINER_FORMAT,
97  )
98  # Skip this segment if it doesn't have data
99  if source.duration is None:
100  source.close()
101  return
102  source_v = source.streams.video[0]
103  source_a = (
104  source.streams.audio[0] if len(source.streams.audio) > 0 else None
105  )
106 
107  # Create output on first segment
108  if not output:
109  container_options: dict[str, str] = {
110  "video_track_timescale": str(int(1 / source_v.time_base)), # type: ignore[operator]
111  "movflags": "frag_keyframe+empty_moov",
112  "min_frag_duration": str(self.stream_settingsstream_settings.min_segment_duration),
113  }
114  output = av.open(
115  self.video_path + ".tmp",
116  "w",
117  format=RECORDER_CONTAINER_FORMAT,
118  container_options=container_options,
119  )
120 
121  # Add output streams if necessary
122  if not output_v:
123  output_v = output.add_stream(template=source_v)
124  context = output_v.codec_context
125  context.global_header = True
126  if source_a and not output_a:
127  output_a = output.add_stream(template=source_a)
128 
129  # Recalculate pts adjustments on first segment and on any discontinuity
130  # We are assuming time base is the same across all discontinuities
131  if last_stream_id != segment.stream_id:
132  last_stream_id = segment.stream_id
133  pts_adjuster["video"] = int(
134  (running_duration - source.start_time)
135  / (av.time_base * source_v.time_base) # type: ignore[operator]
136  )
137  if source_a:
138  pts_adjuster["audio"] = int(
139  (running_duration - source.start_time)
140  / (av.time_base * source_a.time_base) # type: ignore[operator]
141  )
142 
143  # Remux video
144  for packet in source.demux():
145  if packet.pts is None:
146  continue
147  packet.pts += pts_adjuster[packet.stream.type] # type: ignore[operator]
148  packet.dts += pts_adjuster[packet.stream.type] # type: ignore[operator]
149  stream = output_v if packet.stream.type == "video" else output_a
150  assert stream
151  packet.stream = stream
152  output.mux(packet)
153 
154  running_duration += source.duration - source.start_time
155 
156  source.close()
157 
158  def write_transform_matrix_and_rename(video_path: str) -> None:
159  """Update the transform matrix and write to the desired filename."""
160  with (
161  open(video_path + ".tmp", mode="rb") as in_file,
162  open(video_path, mode="wb") as out_file,
163  ):
164  init = transform_init(
165  read_init(in_file), self.dynamic_stream_settingsdynamic_stream_settings.orientation
166  )
167  out_file.write(init)
168  in_file.seek(len(init))
169  while chunk := in_file.read(DEFAULT_BUFFER_SIZE):
170  out_file.write(chunk)
171  os.remove(video_path + ".tmp")
172 
173  def finish_writing(
174  segments: deque[Segment],
175  output: av.container.OutputContainer | None,
176  video_path: str,
177  ) -> None:
178  """Finish writing output."""
179  # Should only have 0 or 1 segments, but loop through just in case
180  while segments:
181  write_segment(segments.popleft())
182  if output is None:
183  _LOGGER.error("Recording failed to capture anything")
184  return
185  output.close()
186  try:
187  write_transform_matrix_and_rename(video_path)
188  except FileNotFoundError:
189  _LOGGER.error(
190  (
191  "Error writing to '%s'. There are likely multiple recordings"
192  " writing to the same file"
193  ),
194  video_path,
195  )
196 
197  # Write lookback segments
198  while len(self._segments) > 1: # The last segment is in progress
199  await self._hass_hass.async_add_executor_job(
200  write_segment, self._segments.popleft()
201  )
202  # Make sure the first segment has been added
203  if not self._segments:
204  await self.recvrecv()
205  # Write segments as soon as they are completed
206  while not self.idleidle:
207  await self.recvrecv()
208  await self._hass_hass.async_add_executor_job(
209  write_segment, self._segments.popleft()
210  )
211  # Write remaining segments and close output
212  await self._hass_hass.async_add_executor_job(
213  finish_writing, self._segments, output, self.video_path
214  )
None prepend(self, list[Segment] segments)
Definition: recorder.py:55
None __init__(self, HomeAssistant hass, IdleTimer idle_timer, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings)
Definition: recorder.py:45
None open(self, **Any kwargs)
Definition: lock.py:86
bytes transform_init(bytes init, Orientation orientation)
Definition: fmp4utils.py:201
bytes read_init(BufferedIOBase bytes_io)
Definition: fmp4utils.py:163
None async_setup_recorder(HomeAssistant hass)
Definition: recorder.py:31