1 """The File Upload integration."""
3 from __future__
import annotations
6 from collections.abc
import Iterator
7 from contextlib
import contextmanager
8 from dataclasses
import dataclass
9 from pathlib
import Path
10 from queue
import SimpleQueue
14 from aiohttp
import BodyPartReader, web
15 import voluptuous
as vol
26 DOMAIN =
"file_upload"
28 ONE_MEGABYTE = 1024 * 1024
29 MAX_SIZE = 100 * ONE_MEGABYTE
30 TEMP_DIR_NAME = f
"home-assistant-{DOMAIN}"
32 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
37 """Get an uploaded file.
39 File is removed at the end of the context.
41 if DOMAIN
not in hass.data:
42 raise ValueError(
"File does not exist")
44 file_upload_data: FileUploadData = hass.data[DOMAIN]
46 if not file_upload_data.has_file(file_id):
47 raise ValueError(
"File does not exist")
50 yield file_upload_data.file_path(file_id)
52 file_upload_data.files.pop(file_id)
53 shutil.rmtree(file_upload_data.file_dir(file_id))
56 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
57 """Set up File Upload."""
58 hass.http.register_view(FileUploadView)
62 @dataclass(frozen=True)
64 """File upload data."""
70 async
def create(cls, hass: HomeAssistant) -> FileUploadData:
71 """Initialize the file upload data."""
73 def _create_temp_dir() -> Path:
74 """Create temporary directory."""
75 temp_dir = Path(tempfile.gettempdir()) / TEMP_DIR_NAME
79 shutil.rmtree(temp_dir)
84 temp_dir = await hass.async_add_executor_job(_create_temp_dir)
86 def cleanup_unused_files(ev: Event) ->
None:
87 """Clean up unused files."""
88 shutil.rmtree(temp_dir)
90 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_unused_files)
92 return cls(temp_dir, {})
94 def has_file(self, file_id: str) -> bool:
95 """Return if file exists."""
96 return file_id
in self.files
98 def file_dir(self, file_id: str) -> Path:
99 """Return the file directory."""
100 return self.temp_dir / file_id
102 def file_path(self, file_id: str) -> Path:
103 """Return the file path."""
104 return self.file_dir(file_id) / self.files[file_id]
108 """HTTP View to upload files."""
110 url =
"/api/file_upload"
111 name =
"api:file_upload"
113 _upload_lock: asyncio.Lock |
None =
None
117 """Get upload lock."""
123 async
def post(self, request: web.Request) -> web.Response:
129 """Handle uploaded file."""
131 request._client_max_size = MAX_SIZE
133 reader = await request.multipart()
134 file_field_reader = await reader.next()
138 not isinstance(file_field_reader, BodyPartReader)
139 or file_field_reader.name !=
"file"
140 or (filename := file_field_reader.filename)
is None
142 raise vol.Invalid(
"Expected a file")
146 except ValueError
as err:
147 raise web.HTTPBadRequest
from err
149 hass = request.app[KEY_HASS]
152 if DOMAIN
not in hass.data:
153 hass.data[DOMAIN] = await FileUploadData.create(hass)
155 file_upload_data: FileUploadData = hass.data[DOMAIN]
156 file_dir = file_upload_data.file_dir(file_id)
157 queue: SimpleQueue[tuple[bytes, asyncio.Future[
None] |
None] |
None] = (
161 def _sync_queue_consumer() -> None:
163 with (file_dir / filename).
open(
"wb")
as file_handle:
165 if (_chunk_future := queue.get())
is None:
167 _chunk, _future = _chunk_future
168 if _future
is not None:
169 hass.loop.call_soon_threadsafe(_future.set_result,
None)
170 file_handle.write(_chunk)
172 fut: asyncio.Future[
None] |
None =
None
174 fut = hass.async_add_executor_job(_sync_queue_consumer)
175 megabytes_sending = 0
176 while chunk := await file_field_reader.read_chunk(ONE_MEGABYTE):
177 megabytes_sending += 1
178 if megabytes_sending % 5 != 0:
179 queue.put_nowait((chunk,
None))
182 chunk_future = hass.loop.create_future()
183 queue.put_nowait((chunk, chunk_future))
185 (fut, chunk_future), return_when=asyncio.FIRST_COMPLETED
191 queue.put_nowait(
None)
196 file_upload_data.files[file_id] = filename
198 return self.json({
"file_id": file_id})
200 @RequestDataValidator({vol.Required("file_id"): str})
201 async
def delete(self, request: web.Request, data: dict[str, str]) -> web.Response:
203 hass = request.app[KEY_HASS]
205 if DOMAIN
not in hass.data:
206 raise web.HTTPNotFound
208 file_id = data[
"file_id"]
209 file_upload_data: FileUploadData = hass.data[DOMAIN]
211 if file_upload_data.files.pop(file_id,
None)
is None:
212 raise web.HTTPNotFound
214 await hass.async_add_executor_job(
215 lambda: shutil.rmtree(file_upload_data.file_dir(file_id))
218 return self.json_message(
"File deleted")
FileUploadData create(cls, HomeAssistant hass)
web.Response post(self, web.Request request)
web.Response _upload_file(self, web.Request request)
web.Response delete(self, web.Request request, dict[str, str] data)
asyncio.Lock _get_upload_lock(self)
Iterator[Path] process_uploaded_file(HomeAssistant hass, str file_id)
bool async_setup(HomeAssistant hass, ConfigType config)
None open(self, **Any kwargs)
None raise_if_invalid_filename(str filename)