Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The File Upload integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Iterator
7 from contextlib import contextmanager
8 from dataclasses import dataclass
9 from pathlib import Path
10 from queue import SimpleQueue
11 import shutil
12 import tempfile
13 
14 from aiohttp import BodyPartReader, web
15 import voluptuous as vol
16 
17 from homeassistant.components.http import KEY_HASS, HomeAssistantView
18 from homeassistant.components.http.data_validator import RequestDataValidator
19 from homeassistant.const import EVENT_HOMEASSISTANT_STOP
20 from homeassistant.core import Event, HomeAssistant, callback
21 from homeassistant.helpers import config_validation as cv
22 from homeassistant.helpers.typing import ConfigType
23 from homeassistant.util import raise_if_invalid_filename
24 from homeassistant.util.ulid import ulid_hex
25 
26 DOMAIN = "file_upload"
27 
28 ONE_MEGABYTE = 1024 * 1024
29 MAX_SIZE = 100 * ONE_MEGABYTE
30 TEMP_DIR_NAME = f"home-assistant-{DOMAIN}"
31 
32 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
33 
34 
35 @contextmanager
36 def process_uploaded_file(hass: HomeAssistant, file_id: str) -> Iterator[Path]:
37  """Get an uploaded file.
38 
39  File is removed at the end of the context.
40  """
41  if DOMAIN not in hass.data:
42  raise ValueError("File does not exist")
43 
44  file_upload_data: FileUploadData = hass.data[DOMAIN]
45 
46  if not file_upload_data.has_file(file_id):
47  raise ValueError("File does not exist")
48 
49  try:
50  yield file_upload_data.file_path(file_id)
51  finally:
52  file_upload_data.files.pop(file_id)
53  shutil.rmtree(file_upload_data.file_dir(file_id))
54 
55 
56 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
57  """Set up File Upload."""
58  hass.http.register_view(FileUploadView)
59  return True
60 
61 
62 @dataclass(frozen=True)
64  """File upload data."""
65 
66  temp_dir: Path
67  files: dict[str, str]
68 
69  @classmethod
70  async def create(cls, hass: HomeAssistant) -> FileUploadData:
71  """Initialize the file upload data."""
72 
73  def _create_temp_dir() -> Path:
74  """Create temporary directory."""
75  temp_dir = Path(tempfile.gettempdir()) / TEMP_DIR_NAME
76 
77  # If it exists, it's an old one and Home Assistant didn't shut down correctly.
78  if temp_dir.exists():
79  shutil.rmtree(temp_dir)
80 
81  temp_dir.mkdir(0o700)
82  return temp_dir
83 
84  temp_dir = await hass.async_add_executor_job(_create_temp_dir)
85 
86  def cleanup_unused_files(ev: Event) -> None:
87  """Clean up unused files."""
88  shutil.rmtree(temp_dir)
89 
90  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_unused_files)
91 
92  return cls(temp_dir, {})
93 
94  def has_file(self, file_id: str) -> bool:
95  """Return if file exists."""
96  return file_id in self.files
97 
98  def file_dir(self, file_id: str) -> Path:
99  """Return the file directory."""
100  return self.temp_dir / file_id
101 
102  def file_path(self, file_id: str) -> Path:
103  """Return the file path."""
104  return self.file_dir(file_id) / self.files[file_id]
105 
106 
107 class FileUploadView(HomeAssistantView):
108  """HTTP View to upload files."""
109 
110  url = "/api/file_upload"
111  name = "api:file_upload"
112 
113  _upload_lock: asyncio.Lock | None = None
114 
115  @callback
116  def _get_upload_lock(self) -> asyncio.Lock:
117  """Get upload lock."""
118  if self._upload_lock_upload_lock is None:
119  self._upload_lock_upload_lock = asyncio.Lock()
120 
121  return self._upload_lock_upload_lock
122 
123  async def post(self, request: web.Request) -> web.Response:
124  """Upload a file."""
125  async with self._get_upload_lock_get_upload_lock():
126  return await self._upload_file_upload_file(request)
127 
128  async def _upload_file(self, request: web.Request) -> web.Response:
129  """Handle uploaded file."""
130  # Increase max payload
131  request._client_max_size = MAX_SIZE # noqa: SLF001
132 
133  reader = await request.multipart()
134  file_field_reader = await reader.next()
135  filename: str | None
136 
137  if (
138  not isinstance(file_field_reader, BodyPartReader)
139  or file_field_reader.name != "file"
140  or (filename := file_field_reader.filename) is None
141  ):
142  raise vol.Invalid("Expected a file")
143 
144  try:
145  raise_if_invalid_filename(filename)
146  except ValueError as err:
147  raise web.HTTPBadRequest from err
148 
149  hass = request.app[KEY_HASS]
150  file_id = ulid_hex()
151 
152  if DOMAIN not in hass.data:
153  hass.data[DOMAIN] = await FileUploadData.create(hass)
154 
155  file_upload_data: FileUploadData = hass.data[DOMAIN]
156  file_dir = file_upload_data.file_dir(file_id)
157  queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = (
158  SimpleQueue()
159  )
160 
161  def _sync_queue_consumer() -> None:
162  file_dir.mkdir()
163  with (file_dir / filename).open("wb") as file_handle:
164  while True:
165  if (_chunk_future := queue.get()) is None:
166  break
167  _chunk, _future = _chunk_future
168  if _future is not None:
169  hass.loop.call_soon_threadsafe(_future.set_result, None)
170  file_handle.write(_chunk)
171 
172  fut: asyncio.Future[None] | None = None
173  try:
174  fut = hass.async_add_executor_job(_sync_queue_consumer)
175  megabytes_sending = 0
176  while chunk := await file_field_reader.read_chunk(ONE_MEGABYTE):
177  megabytes_sending += 1
178  if megabytes_sending % 5 != 0:
179  queue.put_nowait((chunk, None))
180  continue
181 
182  chunk_future = hass.loop.create_future()
183  queue.put_nowait((chunk, chunk_future))
184  await asyncio.wait(
185  (fut, chunk_future), return_when=asyncio.FIRST_COMPLETED
186  )
187  if fut.done():
188  # The executor job failed
189  break
190 
191  queue.put_nowait(None) # terminate queue consumer
192  finally:
193  if fut is not None:
194  await fut
195 
196  file_upload_data.files[file_id] = filename
197 
198  return self.json({"file_id": file_id})
199 
200  @RequestDataValidator({vol.Required("file_id"): str})
201  async def delete(self, request: web.Request, data: dict[str, str]) -> web.Response:
202  """Delete a file."""
203  hass = request.app[KEY_HASS]
204 
205  if DOMAIN not in hass.data:
206  raise web.HTTPNotFound
207 
208  file_id = data["file_id"]
209  file_upload_data: FileUploadData = hass.data[DOMAIN]
210 
211  if file_upload_data.files.pop(file_id, None) is None:
212  raise web.HTTPNotFound
213 
214  await hass.async_add_executor_job(
215  lambda: shutil.rmtree(file_upload_data.file_dir(file_id))
216  )
217 
218  return self.json_message("File deleted")
FileUploadData create(cls, HomeAssistant hass)
Definition: __init__.py:70
web.Response post(self, web.Request request)
Definition: __init__.py:123
web.Response _upload_file(self, web.Request request)
Definition: __init__.py:128
web.Response delete(self, web.Request request, dict[str, str] data)
Definition: __init__.py:201
Iterator[Path] process_uploaded_file(HomeAssistant hass, str file_id)
Definition: __init__.py:36
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:56
None open(self, **Any kwargs)
Definition: lock.py:86
None raise_if_invalid_filename(str filename)
Definition: __init__.py:23