Home Assistant Unofficial Reference 2024.12.1
local_source.py
Go to the documentation of this file.
1 """Local Media Source Implementation."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import mimetypes
7 from pathlib import Path
8 import shutil
9 from typing import Any
10 
11 from aiohttp import web
12 from aiohttp.web_request import FileField
13 import voluptuous as vol
14 
15 from homeassistant.components import http, websocket_api
16 from homeassistant.components.http import require_admin
17 from homeassistant.components.media_player import BrowseError, MediaClass
18 from homeassistant.core import HomeAssistant, callback
19 from homeassistant.util import raise_if_invalid_filename, raise_if_invalid_path
20 
21 from .const import DOMAIN, MEDIA_CLASS_MAP, MEDIA_MIME_TYPES
22 from .error import Unresolvable
23 from .models import BrowseMediaSource, MediaSource, MediaSourceItem, PlayMedia
24 
25 MAX_UPLOAD_SIZE = 1024 * 1024 * 10
26 LOGGER = logging.getLogger(__name__)
27 
28 
29 @callback
30 def async_setup(hass: HomeAssistant) -> None:
31  """Set up local media source."""
32  source = LocalSource(hass)
33  hass.data[DOMAIN][DOMAIN] = source
34  hass.http.register_view(LocalMediaView(hass, source))
35  hass.http.register_view(UploadMediaView(hass, source))
36  websocket_api.async_register_command(hass, websocket_remove_media)
37 
38 
40  """Provide local directories as media sources."""
41 
42  name: str = "My media"
43 
44  def __init__(self, hass: HomeAssistant) -> None:
45  """Initialize local source."""
46  super().__init__(DOMAIN)
47  self.hasshass = hass
48 
49  @callback
50  def async_full_path(self, source_dir_id: str, location: str) -> Path:
51  """Return full path."""
52  base_path = self.hasshass.config.media_dirs[source_dir_id]
53  full_path = Path(base_path, location)
54  full_path.relative_to(base_path)
55  return full_path
56 
57  @callback
58  def async_parse_identifier(self, item: MediaSourceItem) -> tuple[str, str]:
59  """Parse identifier."""
60  if item.domain != DOMAIN:
61  raise Unresolvable("Unknown domain.")
62 
63  source_dir_id, _, location = item.identifier.partition("/")
64  if source_dir_id not in self.hasshass.config.media_dirs:
65  raise Unresolvable("Unknown source directory.")
66 
67  try:
68  raise_if_invalid_path(location)
69  except ValueError as err:
70  raise Unresolvable("Invalid path.") from err
71 
72  if Path(location).is_absolute():
73  raise Unresolvable("Invalid path.")
74 
75  return source_dir_id, location
76 
77  async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
78  """Resolve media to a url."""
79  source_dir_id, location = self.async_parse_identifierasync_parse_identifier(item)
80  path = self.async_full_pathasync_full_path(source_dir_id, location)
81  mime_type, _ = mimetypes.guess_type(str(path))
82  assert isinstance(mime_type, str)
83  return PlayMedia(f"/media/{item.identifier}", mime_type)
84 
85  async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
86  """Return media."""
87  if item.identifier:
88  try:
89  source_dir_id, location = self.async_parse_identifierasync_parse_identifier(item)
90  except Unresolvable as err:
91  raise BrowseError(str(err)) from err
92 
93  else:
94  source_dir_id, location = None, ""
95 
96  return await self.hasshass.async_add_executor_job(
97  self._browse_media_browse_media, source_dir_id, location
98  )
99 
101  self, source_dir_id: str | None, location: str
102  ) -> BrowseMediaSource:
103  """Browse media."""
104 
105  # If only one media dir is configured, use that as the local media root
106  if source_dir_id is None and len(self.hasshass.config.media_dirs) == 1:
107  source_dir_id = list(self.hasshass.config.media_dirs)[0]
108 
109  # Multiple folder, root is requested
110  if source_dir_id is None:
111  if location:
112  raise BrowseError("Folder not found.")
113 
114  base = BrowseMediaSource(
115  domain=DOMAIN,
116  identifier="",
117  media_class=MediaClass.DIRECTORY,
118  media_content_type=None,
119  title=self.namename,
120  can_play=False,
121  can_expand=True,
122  children_media_class=MediaClass.DIRECTORY,
123  )
124 
125  base.children = [
126  self._browse_media_browse_media(source_dir_id, "")
127  for source_dir_id in self.hasshass.config.media_dirs
128  ]
129 
130  return base
131 
132  full_path = Path(self.hasshass.config.media_dirs[source_dir_id], location)
133 
134  if not full_path.exists():
135  if location == "":
136  raise BrowseError("Media directory does not exist.")
137  raise BrowseError("Path does not exist.")
138 
139  if not full_path.is_dir():
140  raise BrowseError("Path is not a directory.")
141 
142  result = self._build_item_response_build_item_response(source_dir_id, full_path)
143  if not result:
144  raise BrowseError("Unknown source directory.")
145  return result
146 
148  self, source_dir_id: str, path: Path, is_child: bool = False
149  ) -> BrowseMediaSource | None:
150  mime_type, _ = mimetypes.guess_type(str(path))
151  is_file = path.is_file()
152  is_dir = path.is_dir()
153 
154  # Make sure it's a file or directory
155  if not is_file and not is_dir:
156  return None
157 
158  # Check that it's a media file
159  if is_file and (
160  not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES
161  ):
162  return None
163 
164  title = path.name
165 
166  media_class = MediaClass.DIRECTORY
167  if mime_type:
168  media_class = MEDIA_CLASS_MAP.get(
169  mime_type.split("/")[0], MediaClass.DIRECTORY
170  )
171 
172  media = BrowseMediaSource(
173  domain=DOMAIN,
174  identifier=f"{source_dir_id}/{path.relative_to(self.hass.config.media_dirs[source_dir_id])}",
175  media_class=media_class,
176  media_content_type=mime_type or "",
177  title=title,
178  can_play=is_file,
179  can_expand=is_dir,
180  )
181 
182  if is_file or is_child:
183  return media
184 
185  # Append first level children
186  media.children = []
187  for child_path in path.iterdir():
188  if child_path.name[0] != ".":
189  child = self._build_item_response_build_item_response(source_dir_id, child_path, True)
190  if child:
191  media.children.append(child)
192 
193  # Sort children showing directories first, then by name
194  media.children.sort(key=lambda child: (child.can_play, child.title))
195 
196  return media
197 
198 
199 class LocalMediaView(http.HomeAssistantView):
200  """Local Media Finder View.
201 
202  Returns media files in config/media.
203  """
204 
205  url = "/media/{source_dir_id}/{location:.*}"
206  name = "media"
207 
208  def __init__(self, hass: HomeAssistant, source: LocalSource) -> None:
209  """Initialize the media view."""
210  self.hasshass = hass
211  self.sourcesource = source
212 
213  async def get(
214  self, request: web.Request, source_dir_id: str, location: str
215  ) -> web.FileResponse:
216  """Start a GET request."""
217  try:
218  raise_if_invalid_path(location)
219  except ValueError as err:
220  raise web.HTTPBadRequest from err
221 
222  if source_dir_id not in self.hasshass.config.media_dirs:
223  raise web.HTTPNotFound
224 
225  media_path = self.sourcesource.async_full_path(source_dir_id, location)
226 
227  # Check that the file exists
228  if not self.hasshass.async_add_executor_job(media_path.is_file):
229  raise web.HTTPNotFound
230 
231  # Check that it's a media file
232  mime_type, _ = mimetypes.guess_type(str(media_path))
233  if not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES:
234  raise web.HTTPNotFound
235 
236  return web.FileResponse(media_path)
237 
238 
239 class UploadMediaView(http.HomeAssistantView):
240  """View to upload images."""
241 
242  url = "/api/media_source/local_source/upload"
243  name = "api:media_source:local_source:upload"
244 
245  def __init__(self, hass: HomeAssistant, source: LocalSource) -> None:
246  """Initialize the media view."""
247  self.hasshass = hass
248  self.sourcesource = source
249  self.schemaschema = vol.Schema(
250  {
251  "media_content_id": str,
252  "file": FileField,
253  }
254  )
255 
256  @require_admin
257  async def post(self, request: web.Request) -> web.Response:
258  """Handle upload."""
259  # Increase max payload
260  request._client_max_size = MAX_UPLOAD_SIZE # noqa: SLF001
261 
262  try:
263  data = self.schemaschema(dict(await request.post()))
264  except vol.Invalid as err:
265  LOGGER.error("Received invalid upload data: %s", err)
266  raise web.HTTPBadRequest from err
267 
268  try:
269  item = MediaSourceItem.from_uri(self.hasshass, data["media_content_id"], None)
270  except ValueError as err:
271  LOGGER.error("Received invalid upload data: %s", err)
272  raise web.HTTPBadRequest from err
273 
274  try:
275  source_dir_id, location = self.sourcesource.async_parse_identifier(item)
276  except Unresolvable as err:
277  LOGGER.error("Invalid local source ID")
278  raise web.HTTPBadRequest from err
279 
280  uploaded_file: FileField = data["file"]
281 
282  if not uploaded_file.content_type.startswith(("image/", "video/", "audio/")):
283  LOGGER.error("Content type not allowed")
284  raise vol.Invalid("Only images and video are allowed")
285 
286  try:
287  raise_if_invalid_filename(uploaded_file.filename)
288  except ValueError as err:
289  LOGGER.error("Invalid filename")
290  raise web.HTTPBadRequest from err
291 
292  try:
293  await self.hasshass.async_add_executor_job(
294  self._move_file_move_file,
295  self.sourcesource.async_full_path(source_dir_id, location),
296  uploaded_file,
297  )
298  except ValueError as err:
299  LOGGER.error("Moving upload failed: %s", err)
300  raise web.HTTPBadRequest from err
301 
302  return self.json(
303  {"media_content_id": f"{data['media_content_id']}/{uploaded_file.filename}"}
304  )
305 
306  def _move_file(self, target_dir: Path, uploaded_file: FileField) -> None:
307  """Move file to target."""
308  if not target_dir.is_dir():
309  raise ValueError("Target is not an existing directory")
310 
311  target_path = target_dir / uploaded_file.filename
312 
313  target_path.relative_to(target_dir)
314  raise_if_invalid_path(str(target_path))
315 
316  with target_path.open("wb") as target_fp:
317  shutil.copyfileobj(uploaded_file.file, target_fp)
318 
319 
320 @websocket_api.websocket_command( { vol.Required("type"): "media_source/local_source/remove",
321  vol.Required("media_content_id"): str,
322  }
323 )
324 @websocket_api.require_admin
325 @websocket_api.async_response
326 async def websocket_remove_media(
327  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
328 ) -> None:
329  """Remove media."""
330  try:
331  item = MediaSourceItem.from_uri(hass, msg["media_content_id"], None)
332  except ValueError as err:
333  connection.send_error(msg["id"], websocket_api.ERR_INVALID_FORMAT, str(err))
334  return
335 
336  source: LocalSource = hass.data[DOMAIN][DOMAIN]
337 
338  try:
339  source_dir_id, location = source.async_parse_identifier(item)
340  except Unresolvable as err:
341  connection.send_error(msg["id"], websocket_api.ERR_INVALID_FORMAT, str(err))
342  return
343 
344  item_path = source.async_full_path(source_dir_id, location)
345 
346  def _do_delete() -> tuple[str, str] | None:
347  if not item_path.exists():
348  return websocket_api.ERR_NOT_FOUND, "Path does not exist"
349 
350  if not item_path.is_file():
351  return websocket_api.ERR_NOT_SUPPORTED, "Path is not a file"
352 
353  item_path.unlink()
354  return None
355 
356  try:
357  error = await hass.async_add_executor_job(_do_delete)
358  except OSError as err:
359  error = (websocket_api.ERR_UNKNOWN_ERROR, str(err))
360 
361  if error:
362  connection.send_error(msg["id"], *error)
363  else:
364  connection.send_result(msg["id"])
365 
None __init__(self, HomeAssistant hass, LocalSource source)
web.FileResponse get(self, web.Request request, str source_dir_id, str location)
Path async_full_path(self, str source_dir_id, str location)
Definition: local_source.py:50
BrowseMediaSource _browse_media(self, str|None source_dir_id, str location)
BrowseMediaSource async_browse_media(self, MediaSourceItem item)
Definition: local_source.py:85
BrowseMediaSource|None _build_item_response(self, str source_dir_id, Path path, bool is_child=False)
tuple[str, str] async_parse_identifier(self, MediaSourceItem item)
Definition: local_source.py:58
PlayMedia async_resolve_media(self, MediaSourceItem item)
Definition: local_source.py:77
None __init__(self, HomeAssistant hass, LocalSource source)
None _move_file(self, Path target_dir, FileField uploaded_file)
None websocket_remove_media(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
tuple[str, str, int|None] async_parse_identifier(MediaSourceItem item)
None raise_if_invalid_path(str path)
Definition: __init__.py:32
None raise_if_invalid_filename(str filename)
Definition: __init__.py:23