1 """Local Media Source Implementation."""
3 from __future__
import annotations
7 from pathlib
import Path
11 from aiohttp
import web
12 from aiohttp.web_request
import FileField
13 import voluptuous
as vol
21 from .const
import DOMAIN, MEDIA_CLASS_MAP, MEDIA_MIME_TYPES
22 from .error
import Unresolvable
23 from .models
import BrowseMediaSource, MediaSource, MediaSourceItem, PlayMedia
25 MAX_UPLOAD_SIZE = 1024 * 1024 * 10
26 LOGGER = logging.getLogger(__name__)
31 """Set up local media source."""
33 hass.data[DOMAIN][DOMAIN] = source
36 websocket_api.async_register_command(hass, websocket_remove_media)
40 """Provide local directories as media sources."""
42 name: str =
"My media"
44 def __init__(self, hass: HomeAssistant) ->
None:
45 """Initialize local source."""
51 """Return full path."""
52 base_path = self.
hasshass.config.media_dirs[source_dir_id]
53 full_path = Path(base_path, location)
54 full_path.relative_to(base_path)
59 """Parse identifier."""
60 if item.domain != DOMAIN:
63 source_dir_id, _, location = item.identifier.partition(
"/")
64 if source_dir_id
not in self.
hasshass.config.media_dirs:
69 except ValueError
as err:
72 if Path(location).is_absolute():
75 return source_dir_id, location
78 """Resolve media to a url."""
81 mime_type, _ = mimetypes.guess_type(
str(path))
82 assert isinstance(mime_type, str)
83 return PlayMedia(f
"/media/{item.identifier}", mime_type)
90 except Unresolvable
as err:
94 source_dir_id, location =
None,
""
96 return await self.
hasshass.async_add_executor_job(
101 self, source_dir_id: str |
None, location: str
102 ) -> BrowseMediaSource:
106 if source_dir_id
is None and len(self.
hasshass.config.media_dirs) == 1:
107 source_dir_id =
list(self.
hasshass.config.media_dirs)[0]
110 if source_dir_id
is None:
117 media_class=MediaClass.DIRECTORY,
118 media_content_type=
None,
122 children_media_class=MediaClass.DIRECTORY,
127 for source_dir_id
in self.
hasshass.config.media_dirs
132 full_path = Path(self.
hasshass.config.media_dirs[source_dir_id], location)
134 if not full_path.exists():
136 raise BrowseError(
"Media directory does not exist.")
139 if not full_path.is_dir():
148 self, source_dir_id: str, path: Path, is_child: bool =
False
149 ) -> BrowseMediaSource |
None:
150 mime_type, _ = mimetypes.guess_type(
str(path))
151 is_file = path.is_file()
152 is_dir = path.is_dir()
155 if not is_file
and not is_dir:
160 not mime_type
or mime_type.split(
"/")[0]
not in MEDIA_MIME_TYPES
166 media_class = MediaClass.DIRECTORY
168 media_class = MEDIA_CLASS_MAP.get(
169 mime_type.split(
"/")[0], MediaClass.DIRECTORY
174 identifier=f
"{source_dir_id}/{path.relative_to(self.hass.config.media_dirs[source_dir_id])}",
175 media_class=media_class,
176 media_content_type=mime_type
or "",
182 if is_file
or is_child:
187 for child_path
in path.iterdir():
188 if child_path.name[0] !=
".":
191 media.children.append(child)
194 media.children.sort(key=
lambda child: (child.can_play, child.title))
200 """Local Media Finder View.
202 Returns media files in config/media.
205 url =
"/media/{source_dir_id}/{location:.*}"
208 def __init__(self, hass: HomeAssistant, source: LocalSource) ->
None:
209 """Initialize the media view."""
214 self, request: web.Request, source_dir_id: str, location: str
215 ) -> web.FileResponse:
216 """Start a GET request."""
219 except ValueError
as err:
220 raise web.HTTPBadRequest
from err
222 if source_dir_id
not in self.
hasshass.config.media_dirs:
223 raise web.HTTPNotFound
225 media_path = self.
sourcesource.async_full_path(source_dir_id, location)
228 if not self.
hasshass.async_add_executor_job(media_path.is_file):
229 raise web.HTTPNotFound
232 mime_type, _ = mimetypes.guess_type(
str(media_path))
233 if not mime_type
or mime_type.split(
"/")[0]
not in MEDIA_MIME_TYPES:
234 raise web.HTTPNotFound
236 return web.FileResponse(media_path)
240 """View to upload images."""
242 url =
"/api/media_source/local_source/upload"
243 name =
"api:media_source:local_source:upload"
245 def __init__(self, hass: HomeAssistant, source: LocalSource) ->
None:
246 """Initialize the media view."""
251 "media_content_id": str,
257 async
def post(self, request: web.Request) -> web.Response:
260 request._client_max_size = MAX_UPLOAD_SIZE
263 data = self.
schemaschema(
dict(await request.post()))
264 except vol.Invalid
as err:
265 LOGGER.error(
"Received invalid upload data: %s", err)
266 raise web.HTTPBadRequest
from err
269 item = MediaSourceItem.from_uri(self.
hasshass, data[
"media_content_id"],
None)
270 except ValueError
as err:
271 LOGGER.error(
"Received invalid upload data: %s", err)
272 raise web.HTTPBadRequest
from err
276 except Unresolvable
as err:
277 LOGGER.error(
"Invalid local source ID")
278 raise web.HTTPBadRequest
from err
280 uploaded_file: FileField = data[
"file"]
282 if not uploaded_file.content_type.startswith((
"image/",
"video/",
"audio/")):
283 LOGGER.error(
"Content type not allowed")
284 raise vol.Invalid(
"Only images and video are allowed")
288 except ValueError
as err:
289 LOGGER.error(
"Invalid filename")
290 raise web.HTTPBadRequest
from err
293 await self.
hasshass.async_add_executor_job(
295 self.
sourcesource.async_full_path(source_dir_id, location),
298 except ValueError
as err:
299 LOGGER.error(
"Moving upload failed: %s", err)
300 raise web.HTTPBadRequest
from err
303 {
"media_content_id": f
"{data['media_content_id']}/{uploaded_file.filename}"}
306 def _move_file(self, target_dir: Path, uploaded_file: FileField) ->
None:
307 """Move file to target."""
308 if not target_dir.is_dir():
309 raise ValueError(
"Target is not an existing directory")
311 target_path = target_dir / uploaded_file.filename
313 target_path.relative_to(target_dir)
316 with target_path.open(
"wb")
as target_fp:
317 shutil.copyfileobj(uploaded_file.file, target_fp)
320 @websocket_api.websocket_command(
{
vol.Required("type"):
"media_source/local_source/remove",
321 vol.Required(
"media_content_id"): str,
324 @websocket_api.require_admin
325 @websocket_api.async_response
331 item = MediaSourceItem.from_uri(hass, msg[
"media_content_id"],
None)
332 except ValueError
as err:
333 connection.send_error(msg[
"id"], websocket_api.ERR_INVALID_FORMAT,
str(err))
336 source: LocalSource = hass.data[DOMAIN][DOMAIN]
339 source_dir_id, location = source.async_parse_identifier(item)
340 except Unresolvable
as err:
341 connection.send_error(msg[
"id"], websocket_api.ERR_INVALID_FORMAT,
str(err))
344 item_path = source.async_full_path(source_dir_id, location)
346 def _do_delete() -> tuple[str, str] | None:
347 if not item_path.exists():
348 return websocket_api.ERR_NOT_FOUND,
"Path does not exist"
350 if not item_path.is_file():
351 return websocket_api.ERR_NOT_SUPPORTED,
"Path is not a file"
357 error = await hass.async_add_executor_job(_do_delete)
358 except OSError
as err:
359 error = (websocket_api.ERR_UNKNOWN_ERROR,
str(err))
362 connection.send_error(msg[
"id"], *error)
364 connection.send_result(msg[
"id"])
365
None raise_if_invalid_path(str path)
None raise_if_invalid_filename(str filename)