Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Image Upload integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import logging
7 import pathlib
8 import secrets
9 import shutil
10 from typing import Any
11 
12 from aiohttp import hdrs, web
13 from aiohttp.web_request import FileField
14 from PIL import Image, ImageOps, UnidentifiedImageError
15 import voluptuous as vol
16 
17 from homeassistant.components import websocket_api
18 from homeassistant.components.http import KEY_HASS, HomeAssistantView
19 from homeassistant.components.http.static import CACHE_HEADERS
20 from homeassistant.const import CONF_ID
21 from homeassistant.core import HomeAssistant, callback
22 from homeassistant.helpers import collection, config_validation as cv
23 from homeassistant.helpers.storage import Store
24 from homeassistant.helpers.typing import ConfigType, VolDictType
25 import homeassistant.util.dt as dt_util
26 
27 from .const import DOMAIN
28 
29 _LOGGER = logging.getLogger(__name__)
30 STORAGE_KEY = "image"
31 STORAGE_VERSION = 1
32 VALID_SIZES = {256, 512}
33 MAX_SIZE = 1024 * 1024 * 10
34 
35 CREATE_FIELDS: VolDictType = {
36  vol.Required("file"): FileField,
37 }
38 
39 UPDATE_FIELDS: VolDictType = {
40  vol.Optional("name"): vol.All(str, vol.Length(min=1)),
41 }
42 
43 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
44 
45 
46 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
47  """Set up the Image integration."""
48  image_dir = pathlib.Path(hass.config.path("image"))
49  hass.data[DOMAIN] = storage_collection = ImageStorageCollection(hass, image_dir)
50  await storage_collection.async_load()
52  storage_collection,
53  "image",
54  "image",
55  CREATE_FIELDS,
56  UPDATE_FIELDS,
57  ).async_setup(hass)
58 
59  hass.http.register_view(ImageUploadView)
60  hass.http.register_view(ImageServeView(image_dir, storage_collection))
61  return True
62 
63 
64 class ImageStorageCollection(collection.DictStorageCollection):
65  """Image collection stored in storage."""
66 
67  CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
68  UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
69 
70  def __init__(self, hass: HomeAssistant, image_dir: pathlib.Path) -> None:
71  """Initialize media storage collection."""
72  super().__init__(
73  Store(hass, STORAGE_VERSION, STORAGE_KEY),
74  )
75  self.async_add_listener(self._change_listener_change_listener)
76  self.image_dirimage_dir = image_dir
77 
78  async def _process_create_data(self, data: dict[str, Any]) -> dict[str, Any]:
79  """Validate the config is valid."""
80  data = self.CREATE_SCHEMACREATE_SCHEMA(dict(data))
81  uploaded_file: FileField = data["file"]
82 
83  if uploaded_file.content_type not in (
84  "image/gif",
85  "image/jpeg",
86  "image/png",
87  ):
88  raise vol.Invalid("Only jpeg, png, and gif images are allowed")
89 
90  data[CONF_ID] = secrets.token_hex(16)
91  data["filesize"] = await self.hass.async_add_executor_job(self._move_data_move_data, data)
92 
93  data["content_type"] = uploaded_file.content_type
94  data["name"] = uploaded_file.filename
95  data["uploaded_at"] = dt_util.utcnow().isoformat()
96 
97  return data
98 
99  def _move_data(self, data: dict[str, Any]) -> int:
100  """Move data."""
101  uploaded_file: FileField = data.pop("file")
102 
103  # Verify we can read the image
104  try:
105  image = Image.open(uploaded_file.file)
106  except UnidentifiedImageError as err:
107  raise vol.Invalid("Unable to identify image file") from err
108 
109  # Reset content
110  uploaded_file.file.seek(0)
111 
112  media_folder: pathlib.Path = self.image_dirimage_dir / data[CONF_ID]
113  media_folder.mkdir(parents=True)
114 
115  media_file = media_folder / "original"
116 
117  # Raises if path is no longer relative to the media dir
118  media_file.relative_to(media_folder)
119 
120  _LOGGER.debug("Storing file %s", media_file)
121 
122  with media_file.open("wb") as target:
123  shutil.copyfileobj(uploaded_file.file, target)
124 
125  image.close()
126 
127  return media_file.stat().st_size
128 
129  @callback
130  def _get_suggested_id(self, info: dict[str, Any]) -> str:
131  """Suggest an ID based on the config."""
132  return str(info[CONF_ID])
133 
134  async def _update_data(
135  self,
136  item: dict[str, Any],
137  update_data: dict[str, Any],
138  ) -> dict[str, Any]:
139  """Return a new updated data object."""
140  return {**item, **self.UPDATE_SCHEMAUPDATE_SCHEMA(update_data)}
141 
142  async def _change_listener(
143  self,
144  change_type: str,
145  item_id: str,
146  data: dict[str, Any],
147  ) -> None:
148  """Handle change."""
149  if change_type != collection.CHANGE_REMOVED:
150  return
151 
152  await self.hass.async_add_executor_job(shutil.rmtree, self.image_dirimage_dir / item_id)
153 
154 
155 class ImageUploadStorageCollectionWebsocket(collection.DictStorageCollectionWebsocket):
156  """Class to expose storage collection management over websocket."""
157 
158  async def ws_create_item(
159  self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
160  ) -> None:
161  """Create an item.
162 
163  Not supported, images are uploaded via the ImageUploadView.
164  """
165  raise NotImplementedError
166 
167 
168 class ImageUploadView(HomeAssistantView):
169  """View to upload images."""
170 
171  url = "/api/image/upload"
172  name = "api:image:upload"
173 
174  async def post(self, request: web.Request) -> web.Response:
175  """Handle upload."""
176  # Increase max payload
177  request._client_max_size = MAX_SIZE # noqa: SLF001
178 
179  data = await request.post()
180  item = await request.app[KEY_HASS].data[DOMAIN].async_create_item(data)
181  return self.json(item)
182 
183 
184 class ImageServeView(HomeAssistantView):
185  """View to download images."""
186 
187  url = "/api/image/serve/{image_id}/{filename}"
188  name = "api:image:serve"
189  requires_auth = False
190 
191  def __init__(
192  self,
193  image_folder: pathlib.Path,
194  image_collection: ImageStorageCollection,
195  ) -> None:
196  """Initialize image serve view."""
197  self.transform_locktransform_lock = asyncio.Lock()
198  self.image_folderimage_folder = image_folder
199  self.image_collectionimage_collection = image_collection
200 
201  async def get(
202  self,
203  request: web.Request,
204  image_id: str,
205  filename: str,
206  ) -> web.FileResponse:
207  """Serve image."""
208  image_info = self.image_collectionimage_collection.data.get(image_id)
209  if image_info is None:
210  raise web.HTTPNotFound
211 
212  if filename == "original":
213  target_file = self.image_folderimage_folder / image_id / filename
214  else:
215  try:
216  width, height = _validate_size_from_filename(filename)
217  except (ValueError, IndexError) as err:
218  raise web.HTTPBadRequest from err
219 
220  hass = request.app[KEY_HASS]
221  target_file = self.image_folderimage_folder / image_id / f"{width}x{height}"
222 
223  if not await hass.async_add_executor_job(target_file.is_file):
224  async with self.transform_locktransform_lock:
225  # Another check in case another request already
226  # finished it while waiting
227  await hass.async_add_executor_job(
228  _generate_thumbnail_if_file_does_not_exist,
229  target_file,
230  self.image_folderimage_folder / image_id / "original",
231  image_info["content_type"],
232  target_file,
233  (width, height),
234  )
235 
236  return web.FileResponse(
237  target_file,
238  headers={**CACHE_HEADERS, hdrs.CONTENT_TYPE: image_info["content_type"]},
239  )
240 
241 
243  target_file: pathlib.Path,
244  original_path: pathlib.Path,
245  content_type: str,
246  target_path: pathlib.Path,
247  target_size: tuple[int, int],
248 ) -> None:
249  """Generate a size."""
250  if not target_file.is_file():
251  image = ImageOps.exif_transpose(Image.open(original_path))
252  image.thumbnail(target_size)
253  image.save(target_path, format=content_type.partition("/")[-1])
254 
255 
256 def _validate_size_from_filename(filename: str) -> tuple[int, int]:
257  """Parse image size from the given filename (of the form WIDTHxHEIGHT-filename).
258 
259  >>> _validate_size_from_filename("100x100-image.png")
260  (100, 100)
261  >>> _validate_size_from_filename("jeff.png")
262  Traceback (most recent call last):
263  ...
264  """
265  image_size = filename.partition("-")[0]
266  if not image_size:
267  raise ValueError("Invalid filename")
268  width_s, _, height_s = image_size.partition("x")
269  width = int(width_s)
270  height = int(height_s)
271  if not width or width != height or width not in VALID_SIZES:
272  raise ValueError(f"Invalid size {image_size}")
273  return (width, height)
web.FileResponse get(self, web.Request request, str image_id, str filename)
Definition: __init__.py:206
None __init__(self, pathlib.Path image_folder, ImageStorageCollection image_collection)
Definition: __init__.py:195
dict[str, Any] _update_data(self, dict[str, Any] item, dict[str, Any] update_data)
Definition: __init__.py:138
None _change_listener(self, str change_type, str item_id, dict[str, Any] data)
Definition: __init__.py:147
dict[str, Any] _process_create_data(self, dict[str, Any] data)
Definition: __init__.py:78
None __init__(self, HomeAssistant hass, pathlib.Path image_dir)
Definition: __init__.py:70
None ws_create_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: __init__.py:160
web.Response post(self, web.Request request)
Definition: __init__.py:174
tuple[int, int] _validate_size_from_filename(str filename)
Definition: __init__.py:256
None _generate_thumbnail_if_file_does_not_exist(pathlib.Path target_file, pathlib.Path original_path, str content_type, pathlib.Path target_path, tuple[int, int] target_size)
Definition: __init__.py:248
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:46
_ItemT async_create_item(self, dict data)
Definition: collection.py:314