1 """motionEye Media Source Implementation."""
3 from __future__
import annotations
6 from pathlib
import PurePath
7 from typing
import cast
9 from motioneye_client.const
import KEY_MEDIA_LIST, KEY_MIME_TYPE, KEY_PATH
24 from .
import get_media_url, split_motioneye_device_identifier
25 from .const
import CONF_CLIENT, DOMAIN
28 "movies":
"video/mp4",
29 "images":
"image/jpeg",
33 "movies": MediaClass.VIDEO,
34 "images": MediaClass.IMAGE,
37 _LOGGER = logging.getLogger(__name__)
49 """Set up motionEye media source."""
54 """Provide motionEye stills and videos as media sources."""
56 name: str =
"motionEye Media"
58 def __init__(self, hass: HomeAssistant) ->
None:
59 """Initialize MotionEyeMediaSource."""
64 """Resolve media to a url."""
65 config_id, device_id, kind, path = self.
_parse_identifier_parse_identifier(item.identifier)
67 if not config_id
or not device_id
or not kind
or not path:
69 f
"Incomplete media identifier specified: {item.identifier}"
77 self.
hasshass.data[DOMAIN][config.entry_id][CONF_CLIENT],
83 raise Unresolvable(f
"Could not resolve media item: {item.identifier}")
85 return PlayMedia(url, MIME_TYPE_MAP[kind])
91 ) -> tuple[str |
None, str |
None, str |
None, str |
None]:
93 data = identifier.split(
"#", 3)
95 tuple[str |
None, str |
None, str |
None, str |
None],
96 tuple(data + base)[:4],
101 item: MediaSourceItem,
102 ) -> BrowseMediaSource:
105 config_id, device_id, kind, path = self.
_parse_identifier_parse_identifier(item.identifier)
106 config = device =
None
115 if config
and device
and kind:
117 if config
and device:
124 """Get a config entry from a URL."""
125 entry = self.
hasshass.config_entries.async_get_entry(config_id)
131 """Get a config entry from a URL."""
132 device_registry = dr.async_get(self.
hasshass)
133 if not (device := device_registry.async_get(device_id)):
139 """Verify kind is an expected value."""
140 if kind
in MEDIA_CLASS_MAP:
146 """Verify path is a valid motionEye path."""
149 if PurePath(path).root ==
"/":
152 f
"motionEye media path must start with '/', received: {path}"
157 cls, config: ConfigEntry, device: dr.DeviceEntry
159 """Get a config entry from a URL."""
160 for identifier
in device.identifiers:
164 raise MediaSourceError(f
"Could not find camera id for device id: {device.id}")
170 identifier=config.entry_id,
171 media_class=MediaClass.DIRECTORY,
172 media_content_type=
"",
176 children_media_class=MediaClass.DIRECTORY,
180 """Build the media sources for config entries."""
184 media_class=MediaClass.DIRECTORY,
185 media_content_type=
"",
186 title=
"motionEye Media",
191 for entry
in self.
hasshass.config_entries.async_entries(DOMAIN)
193 children_media_class=MediaClass.DIRECTORY,
200 device: dr.DeviceEntry,
201 full_title: bool =
True,
202 ) -> BrowseMediaSource:
205 identifier=f
"{config.entry_id}#{device.id}",
206 media_class=MediaClass.DIRECTORY,
207 media_content_type=
"",
208 title=f
"{config.title} {device.name}" if full_title
else device.name,
211 children_media_class=MediaClass.DIRECTORY,
215 """Build the media sources for device entries."""
216 device_registry = dr.async_get(self.
hasshass)
217 devices = dr.async_entries_for_config_entry(device_registry, config.entry_id)
222 for device
in devices
230 device: dr.DeviceEntry,
232 full_title: bool =
True,
233 ) -> BrowseMediaSource:
236 identifier=f
"{config.entry_id}#{device.id}#{kind}",
237 media_class=MediaClass.DIRECTORY,
239 MediaType.VIDEO
if kind ==
"movies" else MediaType.IMAGE
242 f
"{config.title} {device.name} {kind.title()}"
248 children_media_class=(
249 MediaClass.VIDEO
if kind ==
"movies" else MediaClass.IMAGE
254 self, config: ConfigEntry, device: dr.DeviceEntry
255 ) -> BrowseMediaSource:
259 for kind
in MEDIA_CLASS_MAP
266 device: dr.DeviceEntry,
269 ) -> BrowseMediaSource:
270 """Build the media sources for media kinds."""
273 parsed_path = PurePath(path)
275 base.title += f
" {PurePath(*parsed_path.parts[1:])}"
279 client = self.
hasshass.data[DOMAIN][config.entry_id][CONF_CLIENT]
283 resp = await client.async_get_movies(camera_id)
285 resp = await client.async_get_images(camera_id)
287 sub_dirs: set[str] = set()
288 parts = parsed_path.parts
289 media_list = resp.get(KEY_MEDIA_LIST, [])
291 def get_media_sort_key(media: dict) -> str:
292 """Get media sort key."""
293 return media.get(KEY_PATH,
"")
295 for media
in sorted(media_list, key=get_media_sort_key):
297 KEY_PATH
not in media
298 or KEY_MIME_TYPE
not in media
299 or media[KEY_MIME_TYPE]
not in MIME_TYPE_MAP.values()
304 parts_media = PurePath(media[KEY_PATH]).parts
306 if parts_media[: len(parts)] == parts
and len(parts_media) > len(parts):
307 full_child_path =
str(PurePath(*parts_media[: len(parts) + 1]))
308 display_child_path = parts_media[len(parts)]
311 if len(parts) + 1 == len(parts_media):
313 thumbnail_url = client.get_movie_url(
314 camera_id, full_child_path, preview=
True
317 thumbnail_url = client.get_image_url(
318 camera_id, full_child_path, preview=
True
321 base.children.append(
324 identifier=f
"{config.entry_id}#{device.id}#{kind}#{full_child_path}",
325 media_class=MEDIA_CLASS_MAP[kind],
326 media_content_type=media[KEY_MIME_TYPE],
327 title=display_child_path,
328 can_play=(kind ==
"movies"),
330 thumbnail=thumbnail_url,
335 elif len(parts) + 1 < len(parts_media):
336 if full_child_path
not in sub_dirs:
337 sub_dirs.add(full_child_path)
338 base.children.append(
342 f
"{config.entry_id}#{device.id}"
343 f
"#{kind}#{full_child_path}"
345 media_class=MediaClass.DIRECTORY,
351 title=display_child_path,
354 children_media_class=MediaClass.DIRECTORY,
tuple[str, str, int]|None split_motioneye_device_identifier(tuple[str, str] identifier)
str|None get_media_url(MotionEyeClient client, int camera_id, str path, bool image)