1 """The Image Upload integration."""
3 from __future__
import annotations
10 from typing
import Any
12 from aiohttp
import hdrs, web
13 from aiohttp.web_request
import FileField
14 from PIL
import Image, ImageOps, UnidentifiedImageError
15 import voluptuous
as vol
27 from .const
import DOMAIN
29 _LOGGER = logging.getLogger(__name__)
32 VALID_SIZES = {256, 512}
33 MAX_SIZE = 1024 * 1024 * 10
35 CREATE_FIELDS: VolDictType = {
36 vol.Required(
"file"): FileField,
39 UPDATE_FIELDS: VolDictType = {
40 vol.Optional(
"name"): vol.All(str, vol.Length(min=1)),
43 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
46 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
47 """Set up the Image integration."""
48 image_dir = pathlib.Path(hass.config.path(
"image"))
50 await storage_collection.async_load()
59 hass.http.register_view(ImageUploadView)
60 hass.http.register_view(
ImageServeView(image_dir, storage_collection))
65 """Image collection stored in storage."""
67 CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
68 UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
70 def __init__(self, hass: HomeAssistant, image_dir: pathlib.Path) ->
None:
71 """Initialize media storage collection."""
73 Store(hass, STORAGE_VERSION, STORAGE_KEY),
79 """Validate the config is valid."""
81 uploaded_file: FileField = data[
"file"]
83 if uploaded_file.content_type
not in (
88 raise vol.Invalid(
"Only jpeg, png, and gif images are allowed")
90 data[CONF_ID] = secrets.token_hex(16)
91 data[
"filesize"] = await self.hass.async_add_executor_job(self.
_move_data_move_data, data)
93 data[
"content_type"] = uploaded_file.content_type
94 data[
"name"] = uploaded_file.filename
95 data[
"uploaded_at"] = dt_util.utcnow().isoformat()
101 uploaded_file: FileField = data.pop(
"file")
105 image = Image.open(uploaded_file.file)
106 except UnidentifiedImageError
as err:
107 raise vol.Invalid(
"Unable to identify image file")
from err
110 uploaded_file.file.seek(0)
112 media_folder: pathlib.Path = self.
image_dirimage_dir / data[CONF_ID]
113 media_folder.mkdir(parents=
True)
115 media_file = media_folder /
"original"
118 media_file.relative_to(media_folder)
120 _LOGGER.debug(
"Storing file %s", media_file)
122 with media_file.open(
"wb")
as target:
123 shutil.copyfileobj(uploaded_file.file, target)
127 return media_file.stat().st_size
131 """Suggest an ID based on the config."""
132 return str(info[CONF_ID])
136 item: dict[str, Any],
137 update_data: dict[str, Any],
139 """Return a new updated data object."""
140 return {**item, **self.
UPDATE_SCHEMAUPDATE_SCHEMA(update_data)}
146 data: dict[str, Any],
149 if change_type != collection.CHANGE_REMOVED:
152 await self.hass.async_add_executor_job(shutil.rmtree, self.
image_dirimage_dir / item_id)
156 """Class to expose storage collection management over websocket."""
163 Not supported, images are uploaded via the ImageUploadView.
165 raise NotImplementedError
169 """View to upload images."""
171 url =
"/api/image/upload"
172 name =
"api:image:upload"
174 async
def post(self, request: web.Request) -> web.Response:
177 request._client_max_size = MAX_SIZE
179 data = await request.post()
181 return self.json(item)
185 """View to download images."""
187 url =
"/api/image/serve/{image_id}/{filename}"
188 name =
"api:image:serve"
189 requires_auth =
False
193 image_folder: pathlib.Path,
194 image_collection: ImageStorageCollection,
196 """Initialize image serve view."""
203 request: web.Request,
206 ) -> web.FileResponse:
209 if image_info
is None:
210 raise web.HTTPNotFound
212 if filename ==
"original":
213 target_file = self.
image_folderimage_folder / image_id / filename
217 except (ValueError, IndexError)
as err:
218 raise web.HTTPBadRequest
from err
220 hass = request.app[KEY_HASS]
221 target_file = self.
image_folderimage_folder / image_id / f
"{width}x{height}"
223 if not await hass.async_add_executor_job(target_file.is_file):
227 await hass.async_add_executor_job(
228 _generate_thumbnail_if_file_does_not_exist,
231 image_info[
"content_type"],
236 return web.FileResponse(
238 headers={**CACHE_HEADERS, hdrs.CONTENT_TYPE: image_info[
"content_type"]},
243 target_file: pathlib.Path,
244 original_path: pathlib.Path,
246 target_path: pathlib.Path,
247 target_size: tuple[int, int],
249 """Generate a size."""
250 if not target_file.is_file():
251 image = ImageOps.exif_transpose(Image.open(original_path))
252 image.thumbnail(target_size)
253 image.save(target_path, format=content_type.partition(
"/")[-1])
257 """Parse image size from the given filename (of the form WIDTHxHEIGHT-filename).
259 >>> _validate_size_from_filename("100x100-image.png")
261 >>> _validate_size_from_filename("jeff.png")
262 Traceback (most recent call last):
265 image_size = filename.partition(
"-")[0]
267 raise ValueError(
"Invalid filename")
268 width_s, _, height_s = image_size.partition(
"x")
270 height =
int(height_s)
271 if not width
or width != height
or width
not in VALID_SIZES:
272 raise ValueError(f
"Invalid size {image_size}")
273 return (width, height)
web.FileResponse get(self, web.Request request, str image_id, str filename)
None __init__(self, pathlib.Path image_folder, ImageStorageCollection image_collection)
str _get_suggested_id(self, dict[str, Any] info)
dict[str, Any] _update_data(self, dict[str, Any] item, dict[str, Any] update_data)
None _change_listener(self, str change_type, str item_id, dict[str, Any] data)
dict[str, Any] _process_create_data(self, dict[str, Any] data)
None __init__(self, HomeAssistant hass, pathlib.Path image_dir)
int _move_data(self, dict[str, Any] data)
None ws_create_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
web.Response post(self, web.Request request)
tuple[int, int] _validate_size_from_filename(str filename)
None _generate_thumbnail_if_file_does_not_exist(pathlib.Path target_file, pathlib.Path original_path, str content_type, pathlib.Path target_path, tuple[int, int] target_size)
bool async_setup(HomeAssistant hass, ConfigType config)
_ItemT async_create_item(self, dict data)