Home Assistant Unofficial Reference 2024.12.1
ffmpeg_proxy.py
Go to the documentation of this file.
1 """HTTP view that converts audio from a URL to a preferred format."""
2 
3 import asyncio
4 from collections import defaultdict
5 from dataclasses import dataclass, field
6 from http import HTTPStatus
7 import logging
8 import secrets
9 from typing import Final
10 
11 from aiohttp import web
12 from aiohttp.abc import AbstractStreamWriter, BaseRequest
13 
14 from homeassistant.components.ffmpeg import FFmpegManager
15 from homeassistant.components.http import HomeAssistantView
16 from homeassistant.core import HomeAssistant
17 
18 from .const import DATA_FFMPEG_PROXY
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 _MAX_CONVERSIONS_PER_DEVICE: Final[int] = 2
23 
24 
26  hass: HomeAssistant,
27  device_id: str,
28  media_url: str,
29  media_format: str,
30  rate: int | None = None,
31  channels: int | None = None,
32  width: int | None = None,
33 ) -> str:
34  """Create a use proxy URL that automatically converts the media."""
35  data: FFmpegProxyData = hass.data[DATA_FFMPEG_PROXY]
36  return data.async_create_proxy_url(
37  device_id, media_url, media_format, rate, channels, width
38  )
39 
40 
41 @dataclass
43  """Information for ffmpeg conversion."""
44 
45  convert_id: str
46  """Unique id for media conversion."""
47 
48  media_url: str
49  """Source URL of media to convert."""
50 
51  media_format: str
52  """Target format for media (mp3, flac, etc.)"""
53 
54  rate: int | None
55  """Target sample rate (None to keep source rate)."""
56 
57  channels: int | None
58  """Target number of channels (None to keep source channels)."""
59 
60  width: int | None
61  """Target sample width in bytes (None to keep source width)."""
62 
63  proc: asyncio.subprocess.Process | None = None
64  """Subprocess doing ffmpeg conversion."""
65 
66  is_finished: bool = False
67  """True if conversion has finished."""
68 
69 
70 @dataclass
72  """Data for ffmpeg proxy conversion."""
73 
74  # device_id -> [info]
75  conversions: dict[str, list[FFmpegConversionInfo]] = field(
76  default_factory=lambda: defaultdict(list)
77  )
78 
80  self,
81  device_id: str,
82  media_url: str,
83  media_format: str,
84  rate: int | None,
85  channels: int | None,
86  width: int | None,
87  ) -> str:
88  """Create a one-time use proxy URL that automatically converts the media."""
89 
90  # Remove completed conversions
91  device_conversions = [
92  info for info in self.conversions[device_id] if not info.is_finished
93  ]
94 
95  while len(device_conversions) >= _MAX_CONVERSIONS_PER_DEVICE:
96  # Stop oldest conversion before adding a new one
97  convert_info = device_conversions[0]
98  if (convert_info.proc is not None) and (
99  convert_info.proc.returncode is None
100  ):
101  _LOGGER.debug(
102  "Stopping existing ffmpeg process for device: %s", device_id
103  )
104  convert_info.proc.kill()
105 
106  device_conversions = device_conversions[1:]
107 
108  convert_id = secrets.token_urlsafe(16)
109  device_conversions.append(
111  convert_id, media_url, media_format, rate, channels, width
112  )
113  )
114  _LOGGER.debug("Media URL allowed by proxy: %s", media_url)
115 
116  self.conversions[device_id] = device_conversions
117 
118  return f"/api/esphome/ffmpeg_proxy/{device_id}/{convert_id}.{media_format}"
119 
120 
121 class FFmpegConvertResponse(web.StreamResponse):
122  """HTTP streaming response that uses ffmpeg to convert audio from a URL."""
123 
124  def __init__(
125  self,
126  manager: FFmpegManager,
127  convert_info: FFmpegConversionInfo,
128  device_id: str,
129  proxy_data: FFmpegProxyData,
130  chunk_size: int = 2048,
131  ) -> None:
132  """Initialize response.
133 
134  Parameters
135  ----------
136  manager: FFmpegManager
137  ffmpeg manager
138  convert_info: FFmpegConversionInfo
139  Information necessary to do the conversion
140  device_id: str
141  ESPHome device id
142  proxy_data: FFmpegProxyData
143  Data object to store ffmpeg process
144  chunk_size: int
145  Number of bytes to read from ffmpeg process at a time
146 
147  """
148  super().__init__(status=200)
149  self.hasshass = manager.hass
150  self.managermanager = manager
151  self.convert_infoconvert_info = convert_info
152  self.device_iddevice_id = device_id
153  self.proxy_dataproxy_data = proxy_data
154  self.chunk_sizechunk_size = chunk_size
155 
156  async def transcode(
157  self, request: BaseRequest, writer: AbstractStreamWriter
158  ) -> None:
159  """Stream url through ffmpeg conversion and out to HTTP client."""
160  command_args = [
161  "-i",
162  self.convert_infoconvert_info.media_url,
163  "-f",
164  self.convert_infoconvert_info.media_format,
165  ]
166 
167  if self.convert_infoconvert_info.rate is not None:
168  # Sample rate
169  command_args.extend(["-ar", str(self.convert_infoconvert_info.rate)])
170 
171  if self.convert_infoconvert_info.channels is not None:
172  # Number of channels
173  command_args.extend(["-ac", str(self.convert_infoconvert_info.channels)])
174 
175  if self.convert_infoconvert_info.width == 2:
176  # 16-bit samples
177  command_args.extend(["-sample_fmt", "s16"])
178 
179  # Remove metadata and cover art
180  command_args.extend(["-map_metadata", "-1", "-vn"])
181 
182  # disable progress stats on stderr
183  command_args.append("-nostats")
184 
185  # Output to stdout
186  command_args.append("pipe:")
187 
188  _LOGGER.debug("%s %s", self.managermanager.binary, " ".join(command_args))
189  proc = await asyncio.create_subprocess_exec(
190  self.managermanager.binary,
191  *command_args,
192  stdout=asyncio.subprocess.PIPE,
193  stderr=asyncio.subprocess.PIPE,
194  close_fds=False, # use posix_spawn in CPython < 3.13
195  )
196 
197  # Only one conversion process per device is allowed
198  self.convert_infoconvert_info.proc = proc
199 
200  # Create background task which will be cancelled when home assistant shuts down
201  write_task = self.hasshass.async_create_background_task(
202  self._write_ffmpeg_data_write_ffmpeg_data(request, writer, proc), "ESPHome media proxy"
203  )
204  await write_task
205 
207  self,
208  request: BaseRequest,
209  writer: AbstractStreamWriter,
210  proc: asyncio.subprocess.Process,
211  ) -> None:
212  assert proc.stdout is not None
213  assert proc.stderr is not None
214 
215  stderr_task = self.hasshass.async_create_background_task(
216  self._dump_ffmpeg_stderr_dump_ffmpeg_stderr(proc), "ESPHome media proxy dump stderr"
217  )
218 
219  try:
220  # Pull audio chunks from ffmpeg and pass them to the HTTP client
221  while (
222  self.hasshass.is_running
223  and (request.transport is not None)
224  and (not request.transport.is_closing())
225  and (chunk := await proc.stdout.read(self.chunk_sizechunk_size))
226  ):
227  await self.write(chunk)
228  except asyncio.CancelledError:
229  _LOGGER.debug("ffmpeg transcoding cancelled")
230  # Abort the transport, we don't wait for ESPHome to drain the write buffer;
231  # it may need a very long time or never finish if the player is paused.
232  if request.transport:
233  request.transport.abort()
234  raise # don't log error
235  except:
236  _LOGGER.exception("Unexpected error during ffmpeg conversion")
237  raise
238  finally:
239  # Allow conversion info to be removed
240  self.convert_infoconvert_info.is_finished = True
241 
242  # stop dumping ffmpeg stderr task
243  stderr_task.cancel()
244 
245  # Terminate hangs, so kill is used
246  if proc.returncode is None:
247  proc.kill()
248 
249  # Close connection by writing EOF unless already closing
250  if request.transport and not request.transport.is_closing():
251  await writer.write_eof()
252 
254  self,
255  proc: asyncio.subprocess.Process,
256  ) -> None:
257  assert proc.stdout is not None
258  assert proc.stderr is not None
259 
260  while self.hasshass.is_running and (chunk := await proc.stderr.readline()):
261  _LOGGER.debug("ffmpeg[%s] output: %s", proc.pid, chunk.decode().rstrip())
262 
263 
264 class FFmpegProxyView(HomeAssistantView):
265  """FFmpeg web view to convert audio and stream back to client."""
266 
267  requires_auth = False
268  url = "/api/esphome/ffmpeg_proxy/{device_id}/{filename}"
269  name = "api:esphome:ffmpeg_proxy"
270 
271  def __init__(self, manager: FFmpegManager, proxy_data: FFmpegProxyData) -> None:
272  """Initialize an ffmpeg view."""
273  self.managermanager = manager
274  self.proxy_dataproxy_data = proxy_data
275 
276  async def get(
277  self, request: web.Request, device_id: str, filename: str
278  ) -> web.StreamResponse:
279  """Start a get request."""
280  device_conversions = self.proxy_dataproxy_data.conversions[device_id]
281  if not device_conversions:
282  return web.Response(
283  body="No proxy URL for device", status=HTTPStatus.NOT_FOUND
284  )
285 
286  # {id}.mp3 -> id, mp3
287  convert_id, media_format = filename.rsplit(".")
288 
289  # Look up conversion info
290  convert_info: FFmpegConversionInfo | None = None
291  for maybe_convert_info in device_conversions:
292  if (maybe_convert_info.convert_id == convert_id) and (
293  maybe_convert_info.media_format == media_format
294  ):
295  convert_info = maybe_convert_info
296  break
297 
298  if convert_info is None:
299  return web.Response(body="Invalid proxy URL", status=HTTPStatus.BAD_REQUEST)
300 
301  # Stop previous process if the URL is being reused.
302  # We could continue from where the previous connection left off, but
303  # there would be no media header.
304  if (convert_info.proc is not None) and (convert_info.proc.returncode is None):
305  convert_info.proc.kill()
306  convert_info.proc = None
307 
308  # Stream converted audio back to client
309  resp = FFmpegConvertResponse(
310  self.managermanager, convert_info, device_id, self.proxy_dataproxy_data
311  )
312  writer = await resp.prepare(request)
313  assert writer is not None
314  await resp.transcode(request, writer)
315  return resp
None __init__(self, FFmpegManager manager, FFmpegConversionInfo convert_info, str device_id, FFmpegProxyData proxy_data, int chunk_size=2048)
None transcode(self, BaseRequest request, AbstractStreamWriter writer)
None _dump_ffmpeg_stderr(self, asyncio.subprocess.Process proc)
None _write_ffmpeg_data(self, BaseRequest request, AbstractStreamWriter writer, asyncio.subprocess.Process proc)
str async_create_proxy_url(self, str device_id, str media_url, str media_format, int|None rate, int|None channels, int|None width)
Definition: ffmpeg_proxy.py:87
web.StreamResponse get(self, web.Request request, str device_id, str filename)
None __init__(self, FFmpegManager manager, FFmpegProxyData proxy_data)
str async_create_proxy_url(HomeAssistant hass, str device_id, str media_url, str media_format, int|None rate=None, int|None channels=None, int|None width=None)
Definition: ffmpeg_proxy.py:33