Home Assistant Unofficial Reference 2024.12.1
hls.py
Go to the documentation of this file.
1 """Provide functionality to stream HLS."""
2 
3 from __future__ import annotations
4 
5 from http import HTTPStatus
6 from typing import TYPE_CHECKING, cast
7 
8 from aiohttp import web
9 
10 from homeassistant.core import HomeAssistant, callback
11 
12 from .const import (
13  EXT_X_START_LL_HLS,
14  EXT_X_START_NON_LL_HLS,
15  FORMAT_CONTENT_TYPE,
16  HLS_PROVIDER,
17  MAX_SEGMENTS,
18  NUM_PLAYLIST_SEGMENTS,
19 )
20 from .core import (
21  PROVIDERS,
22  IdleTimer,
23  Segment,
24  StreamOutput,
25  StreamSettings,
26  StreamView,
27 )
28 from .fmp4utils import get_codec_string, transform_init
29 
30 if TYPE_CHECKING:
31  from homeassistant.components.camera import DynamicStreamSettings
32 
33  from . import Stream
34 
35 
36 @callback
37 def async_setup_hls(hass: HomeAssistant) -> str:
38  """Set up api endpoints."""
39  hass.http.register_view(HlsPlaylistView())
40  hass.http.register_view(HlsSegmentView())
41  hass.http.register_view(HlsInitView())
42  hass.http.register_view(HlsMasterPlaylistView())
43  hass.http.register_view(HlsPartView())
44  return "/api/hls/{}/master_playlist.m3u8"
45 
46 
47 @PROVIDERS.register(HLS_PROVIDER)
49  """Represents HLS Output formats."""
50 
51  def __init__(
52  self,
53  hass: HomeAssistant,
54  idle_timer: IdleTimer,
55  stream_settings: StreamSettings,
56  dynamic_stream_settings: DynamicStreamSettings,
57  ) -> None:
58  """Initialize HLS output."""
59  super().__init__(
60  hass,
61  idle_timer,
62  stream_settings,
63  dynamic_stream_settings,
64  deque_maxlen=MAX_SEGMENTS,
65  )
66  self._target_duration_target_duration = stream_settings.min_segment_duration
67 
68  @property
69  def name(self) -> str:
70  """Return provider name."""
71  return HLS_PROVIDER
72 
73  def cleanup(self) -> None:
74  """Handle cleanup."""
75  super().cleanup()
76  self._segments.clear()
77 
78  @property
79  def target_duration(self) -> float:
80  """Return the target duration."""
81  return self._target_duration_target_duration
82 
83  @callback
84  def _async_put(self, segment: Segment) -> None:
85  """Async put and also update the target duration.
86 
87  The target duration is calculated as the max duration of any given segment.
88  Technically it should not change per the hls spec, but some cameras adjust
89  their GOPs periodically so we need to account for this change.
90  """
91  super()._async_put(segment)
92  self._target_duration_target_duration = (
93  max((s.duration for s in self._segments), default=segment.duration)
94  or self.stream_settingsstream_settings.min_segment_duration
95  )
96 
97  def discontinuity(self) -> None:
98  """Fix incomplete segment at end of deque."""
99  self._hass_hass.loop.call_soon_threadsafe(self._async_discontinuity_async_discontinuity)
100 
101  @callback
102  def _async_discontinuity(self) -> None:
103  """Fix incomplete segment at end of deque in event loop."""
104  # Fill in the segment duration or delete the segment if empty
105  if self._segments:
106  if (last_segment := self._segments[-1]).parts:
107  last_segment.duration = sum(
108  part.duration for part in last_segment.parts
109  )
110  else:
111  self._segments.pop()
112 
113 
115  """Stream view used only for Chromecast compatibility."""
116 
117  url = r"/api/hls/{token:[a-f0-9]+}/master_playlist.m3u8"
118  name = "api:stream:hls:master_playlist"
119  cors_allowed = True
120 
121  @staticmethod
122  def render(track: StreamOutput) -> str:
123  """Render M3U8 file."""
124  # Need to calculate max bandwidth as input_container.bit_rate doesn't seem to work
125  # Calculate file size / duration and use a small multiplier to account for variation
126  # hls spec already allows for 25% variation
127  if not (segment := track.get_segment(track.sequences[-2])):
128  return ""
129  bandwidth = round(segment.data_size_with_init * 8 / segment.duration * 1.2)
130  codecs = get_codec_string(segment.init)
131  lines = [
132  "#EXTM3U",
133  f'#EXT-X-STREAM-INF:BANDWIDTH={bandwidth},CODECS="{codecs}"',
134  "playlist.m3u8",
135  ]
136  return "\n".join(lines) + "\n"
137 
138  async def handle(
139  self, request: web.Request, stream: Stream, sequence: str, part_num: str
140  ) -> web.Response:
141  """Return m3u8 playlist."""
142  track = stream.add_provider(HLS_PROVIDER)
143  await stream.start()
144  # Make sure at least two segments are ready (last one may not be complete)
145  if not track.sequences and not await track.recv():
146  return web.HTTPNotFound()
147  if len(track.sequences) == 1 and not await track.recv():
148  return web.HTTPNotFound()
149  response = web.Response(
150  body=self.renderrender(track).encode("utf-8"),
151  headers={
152  "Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER],
153  },
154  )
155  response.enable_compression(web.ContentCoding.gzip)
156  return response
157 
158 
160  """Stream view to serve a M3U8 stream."""
161 
162  url = r"/api/hls/{token:[a-f0-9]+}/playlist.m3u8"
163  name = "api:stream:hls:playlist"
164  cors_allowed = True
165 
166  @classmethod
167  def render(cls, track: HlsStreamOutput) -> str:
168  """Render HLS playlist file."""
169  # NUM_PLAYLIST_SEGMENTS+1 because most recent is probably not yet complete
170  segments = list(track.get_segments())[-(NUM_PLAYLIST_SEGMENTS + 1) :]
171 
172  # To cap the number of complete segments at NUM_PLAYLIST_SEGMENTS,
173  # remove the first segment if the last segment is actually complete
174  if segments[-1].complete:
175  segments = segments[-NUM_PLAYLIST_SEGMENTS:]
176 
177  first_segment = segments[0]
178  playlist = [
179  "#EXTM3U",
180  "#EXT-X-VERSION:6",
181  "#EXT-X-INDEPENDENT-SEGMENTS",
182  '#EXT-X-MAP:URI="init.mp4"',
183  f"#EXT-X-TARGETDURATION:{track.target_duration:.0f}",
184  f"#EXT-X-MEDIA-SEQUENCE:{first_segment.sequence}",
185  f"#EXT-X-DISCONTINUITY-SEQUENCE:{first_segment.stream_id}",
186  ]
187 
188  if track.stream_settings.ll_hls:
189  playlist.extend(
190  [
191  f"#EXT-X-PART-INF:PART-TARGET={track.stream_settings.part_target_duration:.3f}",
192  f"#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK={2*track.stream_settings.part_target_duration:.3f}",
193  f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_LL_HLS*track.stream_settings.part_target_duration:.3f},PRECISE=YES",
194  ]
195  )
196  else:
197  # Since our window doesn't have many segments, we don't want to start
198  # at the beginning or we risk a behind live window exception in Exoplayer.
199  # EXT-X-START is not supposed to be within 3 target durations of the end,
200  # but a value as low as 1.5 doesn't seem to hurt.
201  # A value below 3 may not be as useful for hls.js as many hls.js clients
202  # don't autoplay. Also, hls.js uses the player parameter liveSyncDuration
203  # which seems to take precedence for setting target delay. Yet it also
204  # doesn't seem to hurt, so we can stick with it for now.
205  playlist.append(
206  f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_NON_LL_HLS*track.target_duration:.3f},PRECISE=YES"
207  )
208 
209  last_stream_id = first_segment.stream_id
210 
211  # Add playlist sections for completed segments
212  # Enumeration used to only include EXT-X-PART data for last 3 segments.
213  # The RFC seems to suggest removing parts after 3 full segments, but Apple's
214  # own example shows removing after 2 full segments and 1 part one.
215  for i, segment in enumerate(segments[:-1], 3 - len(segments)):
216  playlist.append(
217  segment.render_hls(
218  last_stream_id=last_stream_id,
219  render_parts=i >= 0 and track.stream_settings.ll_hls,
220  add_hint=False,
221  )
222  )
223  last_stream_id = segment.stream_id
224 
225  playlist.append(
226  segments[-1].render_hls(
227  last_stream_id=last_stream_id,
228  render_parts=track.stream_settings.ll_hls,
229  add_hint=track.stream_settings.ll_hls,
230  )
231  )
232 
233  return "\n".join(playlist) + "\n"
234 
235  @staticmethod
236  def bad_request(blocking: bool, target_duration: float) -> web.Response:
237  """Return a HTTP Bad Request response."""
238  return web.Response(
239  body=None,
240  status=HTTPStatus.BAD_REQUEST,
241  )
242 
243  @staticmethod
244  def not_found(blocking: bool, target_duration: float) -> web.Response:
245  """Return a HTTP Not Found response."""
246  return web.Response(
247  body=None,
248  status=HTTPStatus.NOT_FOUND,
249  )
250 
251  async def handle(
252  self, request: web.Request, stream: Stream, sequence: str, part_num: str
253  ) -> web.Response:
254  """Return m3u8 playlist."""
255  track: HlsStreamOutput = cast(
256  HlsStreamOutput, stream.add_provider(HLS_PROVIDER)
257  )
258  await stream.start()
259 
260  hls_msn: str | int | None = request.query.get("_HLS_msn")
261  hls_part: str | int | None = request.query.get("_HLS_part")
262  blocking_request = bool(hls_msn or hls_part)
263 
264  # If the Playlist URI contains an _HLS_part directive but no _HLS_msn
265  # directive, the Server MUST return Bad Request, such as HTTP 400.
266  if hls_msn is None and hls_part:
267  return web.HTTPBadRequest()
268 
269  hls_msn = int(hls_msn or 0)
270 
271  # If the _HLS_msn is greater than the Media Sequence Number of the last
272  # Media Segment in the current Playlist plus two, or if the _HLS_part
273  # exceeds the last Part Segment in the current Playlist by the
274  # Advance Part Limit, then the server SHOULD immediately return Bad
275  # Request, such as HTTP 400.
276  if hls_msn > track.last_sequence + 2:
277  return self.bad_requestbad_request(blocking_request, track.target_duration)
278 
279  if hls_part is None:
280  # We need to wait for the whole segment, so effectively the next msn
281  hls_part = -1
282  hls_msn += 1
283  else:
284  hls_part = int(hls_part)
285 
286  while hls_msn > track.last_sequence:
287  if not await track.recv():
288  return self.not_foundnot_found(blocking_request, track.target_duration)
289  if track.last_segment is None:
290  return self.not_foundnot_found(blocking_request, 0)
291  if (
292  (last_segment := track.last_segment)
293  and hls_msn == last_segment.sequence
294  and hls_part
295  >= len(last_segment.parts)
296  - 1
297  + track.stream_settings.hls_advance_part_limit
298  ):
299  return self.bad_requestbad_request(blocking_request, track.target_duration)
300 
301  # Receive parts until msn and part are met
302  while (
303  (last_segment := track.last_segment)
304  and hls_msn == last_segment.sequence
305  and hls_part >= len(last_segment.parts)
306  ):
307  if not await track.part_recv(
308  timeout=track.stream_settings.hls_part_timeout
309  ):
310  return self.not_foundnot_found(blocking_request, track.target_duration)
311  # Now we should have msn.part >= hls_msn.hls_part. However, in the case
312  # that we have a rollover part request from the previous segment, we need
313  # to make sure that the new segment has a part. From 6.2.5.2 of the RFC:
314  # If the Client requests a Part Index greater than that of the final
315  # Partial Segment of the Parent Segment, the Server MUST treat the
316  # request as one for Part Index 0 of the following Parent Segment.
317  if hls_msn + 1 == last_segment.sequence:
318  if not (previous_segment := track.get_segment(hls_msn)) or (
319  hls_part >= len(previous_segment.parts)
320  and not last_segment.parts
321  and not await track.part_recv(
322  timeout=track.stream_settings.hls_part_timeout
323  )
324  ):
325  return self.not_foundnot_found(blocking_request, track.target_duration)
326 
327  response = web.Response(
328  body=self.renderrender(track).encode("utf-8"),
329  headers={
330  "Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER],
331  },
332  )
333  response.enable_compression(web.ContentCoding.gzip)
334  return response
335 
336 
338  """Stream view to serve HLS init.mp4."""
339 
340  url = r"/api/hls/{token:[a-f0-9]+}/init.mp4"
341  name = "api:stream:hls:init"
342  cors_allowed = True
343 
344  async def handle(
345  self, request: web.Request, stream: Stream, sequence: str, part_num: str
346  ) -> web.Response:
347  """Return init.mp4."""
348  track = stream.add_provider(HLS_PROVIDER)
349  if not (segments := track.get_segments()) or not (body := segments[0].init):
350  return web.HTTPNotFound()
351  return web.Response(
352  body=transform_init(body, stream.dynamic_stream_settings.orientation),
353  headers={"Content-Type": "video/mp4"},
354  )
355 
356 
358  """Stream view to serve a HLS fmp4 segment."""
359 
360  url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.{part_num:\d+}.m4s"
361  name = "api:stream:hls:part"
362  cors_allowed = True
363 
364  async def handle(
365  self, request: web.Request, stream: Stream, sequence: str, part_num: str
366  ) -> web.Response:
367  """Handle part."""
368  track: HlsStreamOutput = cast(
369  HlsStreamOutput, stream.add_provider(HLS_PROVIDER)
370  )
371  track.idle_timer.awake()
372  # Ensure that we have a segment. If the request is from a hint for part 0
373  # of a segment, there is a small chance it may have arrived before the
374  # segment has been put. If this happens, wait for one part and retry.
375  if not (
376  (segment := track.get_segment(int(sequence)))
377  or (
378  await track.part_recv(timeout=track.stream_settings.hls_part_timeout)
379  and (segment := track.get_segment(int(sequence)))
380  )
381  ):
382  return web.Response(
383  body=None,
384  status=HTTPStatus.NOT_FOUND,
385  )
386  # If the part is ready or has been hinted,
387  if int(part_num) == len(segment.parts):
388  await track.part_recv(timeout=track.stream_settings.hls_part_timeout)
389  if int(part_num) >= len(segment.parts):
390  return web.HTTPRequestRangeNotSatisfiable()
391  return web.Response(
392  body=segment.parts[int(part_num)].data,
393  headers={
394  "Content-Type": "video/iso.segment",
395  },
396  )
397 
398 
400  """Stream view to serve a HLS fmp4 segment."""
401 
402  url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s"
403  name = "api:stream:hls:segment"
404  cors_allowed = True
405 
406  async def handle(
407  self, request: web.Request, stream: Stream, sequence: str, part_num: str
408  ) -> web.StreamResponse:
409  """Handle segments."""
410  track: HlsStreamOutput = cast(
411  HlsStreamOutput, stream.add_provider(HLS_PROVIDER)
412  )
413  track.idle_timer.awake()
414  # Ensure that we have a segment. If the request is from a hint for part 0
415  # of a segment, there is a small chance it may have arrived before the
416  # segment has been put. If this happens, wait for one part and retry.
417  if not (
418  (segment := track.get_segment(int(sequence)))
419  or (
420  await track.part_recv(timeout=track.stream_settings.hls_part_timeout)
421  and (segment := track.get_segment(int(sequence)))
422  )
423  ):
424  return web.Response(
425  body=None,
426  status=HTTPStatus.NOT_FOUND,
427  )
428  return web.Response(
429  body=segment.get_data(),
430  headers={
431  "Content-Type": "video/iso.segment",
432  },
433  )
web.Response handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: hls.py:346
web.Response handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: hls.py:140
web.Response handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: hls.py:366
str render(cls, HlsStreamOutput track)
Definition: hls.py:167
web.Response not_found(bool blocking, float target_duration)
Definition: hls.py:244
web.Response handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: hls.py:253
web.Response bad_request(bool blocking, float target_duration)
Definition: hls.py:236
web.StreamResponse handle(self, web.Request request, Stream stream, str sequence, str part_num)
Definition: hls.py:408
None __init__(self, HomeAssistant hass, IdleTimer idle_timer, StreamSettings stream_settings, DynamicStreamSettings dynamic_stream_settings)
Definition: hls.py:57
None _async_put(self, Segment segment)
Definition: hls.py:84
str get_codec_string(bytes mp4_bytes)
Definition: fmp4utils.py:37
bytes transform_init(bytes init, Orientation orientation)
Definition: fmp4utils.py:201
str async_setup_hls(HomeAssistant hass)
Definition: hls.py:37