Home Assistant Unofficial Reference 2024.12.1
media_source.py
Go to the documentation of this file.
1 """The Media Source implementation for the Jellyfin integration."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import mimetypes
7 import os
8 from typing import Any
9 
10 from jellyfin_apiclient_python.api import jellyfin_url
11 from jellyfin_apiclient_python.client import JellyfinClient
12 
13 from homeassistant.components.media_player import BrowseError, MediaClass
15  BrowseMediaSource,
16  MediaSource,
17  MediaSourceItem,
18  PlayMedia,
19 )
20 from homeassistant.core import HomeAssistant
21 
22 from . import JellyfinConfigEntry
23 from .const import (
24  COLLECTION_TYPE_MOVIES,
25  COLLECTION_TYPE_MUSIC,
26  CONF_AUDIO_CODEC,
27  DOMAIN,
28  ITEM_KEY_COLLECTION_TYPE,
29  ITEM_KEY_ID,
30  ITEM_KEY_IMAGE_TAGS,
31  ITEM_KEY_INDEX_NUMBER,
32  ITEM_KEY_MEDIA_SOURCES,
33  ITEM_KEY_MEDIA_TYPE,
34  ITEM_KEY_NAME,
35  ITEM_TYPE_ALBUM,
36  ITEM_TYPE_ARTIST,
37  ITEM_TYPE_AUDIO,
38  ITEM_TYPE_EPISODE,
39  ITEM_TYPE_LIBRARY,
40  ITEM_TYPE_MOVIE,
41  ITEM_TYPE_SEASON,
42  ITEM_TYPE_SERIES,
43  MAX_IMAGE_WIDTH,
44  MEDIA_SOURCE_KEY_PATH,
45  MEDIA_TYPE_AUDIO,
46  MEDIA_TYPE_NONE,
47  MEDIA_TYPE_VIDEO,
48  PLAYABLE_ITEM_TYPES,
49  SUPPORTED_COLLECTION_TYPES,
50 )
51 
52 _LOGGER = logging.getLogger(__name__)
53 
54 
55 async def async_get_media_source(hass: HomeAssistant) -> MediaSource:
56  """Set up Jellyfin media source."""
57  # Currently only a single Jellyfin server is supported
58  entry: JellyfinConfigEntry = hass.config_entries.async_entries(DOMAIN)[0]
59  coordinator = entry.runtime_data
60 
61  return JellyfinSource(hass, coordinator.api_client, entry)
62 
63 
64 class JellyfinSource(MediaSource):
65  """Represents a Jellyfin server."""
66 
67  name: str = "Jellyfin"
68 
69  def __init__(
70  self, hass: HomeAssistant, client: JellyfinClient, entry: JellyfinConfigEntry
71  ) -> None:
72  """Initialize the Jellyfin media source."""
73  super().__init__(DOMAIN)
74 
75  self.hasshass = hass
76  self.entryentry = entry
77 
78  self.clientclient = client
79  self.apiapi = client.jellyfin
80  self.urlurl = jellyfin_url(client, "")
81 
82  async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
83  """Return a streamable URL and associated mime type."""
84  media_item = await self.hasshass.async_add_executor_job(
85  self.apiapi.get_item, item.identifier
86  )
87 
88  stream_url = self._get_stream_url_get_stream_url(media_item)
89  mime_type = _media_mime_type(media_item)
90 
91  # Media Sources without a mime type have been filtered out during library creation
92  assert mime_type is not None
93 
94  return PlayMedia(stream_url, mime_type)
95 
96  async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
97  """Return a browsable Jellyfin media source."""
98  if not item.identifier:
99  return await self._build_libraries_build_libraries()
100 
101  media_item = await self.hasshass.async_add_executor_job(
102  self.apiapi.get_item, item.identifier
103  )
104 
105  item_type = media_item["Type"]
106  if item_type == ITEM_TYPE_LIBRARY:
107  return await self._build_library_build_library(media_item, True)
108  if item_type == ITEM_TYPE_ARTIST:
109  return await self._build_artist_build_artist(media_item, True)
110  if item_type == ITEM_TYPE_ALBUM:
111  return await self._build_album_build_album(media_item, True)
112  if item_type == ITEM_TYPE_SERIES:
113  return await self._build_series_build_series(media_item, True)
114  if item_type == ITEM_TYPE_SEASON:
115  return await self._build_season_build_season(media_item, True)
116 
117  raise BrowseError(f"Unsupported item type {item_type}")
118 
119  async def _build_libraries(self) -> BrowseMediaSource:
120  """Return all supported libraries the user has access to as media sources."""
121  base = BrowseMediaSource(
122  domain=DOMAIN,
123  identifier=None,
124  media_class=MediaClass.DIRECTORY,
125  media_content_type=MEDIA_TYPE_NONE,
126  title=self.name,
127  can_play=False,
128  can_expand=True,
129  children_media_class=MediaClass.DIRECTORY,
130  )
131 
132  libraries = await self._get_libraries_get_libraries()
133 
134  base.children = []
135 
136  for library in libraries:
137  base.children.append(await self._build_library_build_library(library, False))
138 
139  return base
140 
141  async def _get_libraries(self) -> list[dict[str, Any]]:
142  """Return all supported libraries a user has access to."""
143  response = await self.hasshass.async_add_executor_job(self.apiapi.get_media_folders)
144  libraries = response["Items"]
145  result = []
146  for library in libraries:
147  if ITEM_KEY_COLLECTION_TYPE in library:
148  if library[ITEM_KEY_COLLECTION_TYPE] in SUPPORTED_COLLECTION_TYPES:
149  result.append(library)
150  return result
151 
152  async def _build_library(
153  self, library: dict[str, Any], include_children: bool
154  ) -> BrowseMediaSource:
155  """Return a single library as a browsable media source."""
156  collection_type = library[ITEM_KEY_COLLECTION_TYPE]
157 
158  if collection_type == COLLECTION_TYPE_MUSIC:
159  return await self._build_music_library_build_music_library(library, include_children)
160  if collection_type == COLLECTION_TYPE_MOVIES:
161  return await self._build_movie_library_build_movie_library(library, include_children)
162  return await self._build_tv_library_build_tv_library(library, include_children)
163 
165  self, library: dict[str, Any], include_children: bool
166  ) -> BrowseMediaSource:
167  """Return a single music library as a browsable media source."""
168  library_id = library[ITEM_KEY_ID]
169  library_name = library[ITEM_KEY_NAME]
170 
171  result = BrowseMediaSource(
172  domain=DOMAIN,
173  identifier=library_id,
174  media_class=MediaClass.DIRECTORY,
175  media_content_type=MEDIA_TYPE_NONE,
176  title=library_name,
177  can_play=False,
178  can_expand=True,
179  )
180 
181  if include_children:
182  result.children_media_class = MediaClass.ARTIST
183  result.children = await self._build_artists_build_artists(library_id)
184  if not result.children:
185  result.children_media_class = MediaClass.ALBUM
186  result.children = await self._build_albums_build_albums(library_id)
187 
188  return result
189 
190  async def _build_artists(self, library_id: str) -> list[BrowseMediaSource]:
191  """Return all artists in the music library."""
192  artists = await self._get_children_get_children(library_id, ITEM_TYPE_ARTIST)
193  artists = sorted(
194  artists,
195  # Sort by whether an artist has an name first, then by name
196  # This allows for sorting artists with, without and with missing names
197  key=lambda k: (
198  ITEM_KEY_NAME not in k,
199  k.get(ITEM_KEY_NAME),
200  ),
201  )
202  return [await self._build_artist_build_artist(artist, False) for artist in artists]
203 
204  async def _build_artist(
205  self, artist: dict[str, Any], include_children: bool
206  ) -> BrowseMediaSource:
207  """Return a single artist as a browsable media source."""
208  artist_id = artist[ITEM_KEY_ID]
209  artist_name = artist[ITEM_KEY_NAME]
210  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(artist)
211 
212  result = BrowseMediaSource(
213  domain=DOMAIN,
214  identifier=artist_id,
215  media_class=MediaClass.ARTIST,
216  media_content_type=MEDIA_TYPE_NONE,
217  title=artist_name,
218  can_play=False,
219  can_expand=True,
220  thumbnail=thumbnail_url,
221  )
222 
223  if include_children:
224  result.children_media_class = MediaClass.ALBUM
225  result.children = await self._build_albums_build_albums(artist_id)
226 
227  return result
228 
229  async def _build_albums(self, parent_id: str) -> list[BrowseMediaSource]:
230  """Return all albums of a single artist as browsable media sources."""
231  albums = await self._get_children_get_children(parent_id, ITEM_TYPE_ALBUM)
232  albums = sorted(
233  albums,
234  # Sort by whether an album has an name first, then by name
235  # This allows for sorting albums with, without and with missing names
236  key=lambda k: (
237  ITEM_KEY_NAME not in k,
238  k.get(ITEM_KEY_NAME),
239  ),
240  )
241  return [await self._build_album_build_album(album, False) for album in albums]
242 
243  async def _build_album(
244  self, album: dict[str, Any], include_children: bool
245  ) -> BrowseMediaSource:
246  """Return a single album as a browsable media source."""
247  album_id = album[ITEM_KEY_ID]
248  album_title = album[ITEM_KEY_NAME]
249  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(album)
250 
251  result = BrowseMediaSource(
252  domain=DOMAIN,
253  identifier=album_id,
254  media_class=MediaClass.ALBUM,
255  media_content_type=MEDIA_TYPE_NONE,
256  title=album_title,
257  can_play=False,
258  can_expand=True,
259  thumbnail=thumbnail_url,
260  )
261 
262  if include_children:
263  result.children_media_class = MediaClass.TRACK
264  result.children = await self._build_tracks_build_tracks(album_id)
265 
266  return result
267 
268  async def _build_tracks(self, album_id: str) -> list[BrowseMediaSource]:
269  """Return all tracks of a single album as browsable media sources."""
270  tracks = await self._get_children_get_children(album_id, ITEM_TYPE_AUDIO)
271  tracks = sorted(
272  tracks,
273  # Sort by whether a track has an index first, then by index
274  # This allows for sorting tracks with, without and with missing indices
275  key=lambda k: (
276  ITEM_KEY_INDEX_NUMBER not in k,
277  k.get(ITEM_KEY_INDEX_NUMBER),
278  ),
279  )
280  return [
281  self._build_track_build_track(track)
282  for track in tracks
283  if _media_mime_type(track) is not None
284  ]
285 
286  def _build_track(self, track: dict[str, Any]) -> BrowseMediaSource:
287  """Return a single track as a browsable media source."""
288  track_id = track[ITEM_KEY_ID]
289  track_title = track[ITEM_KEY_NAME]
290  mime_type = _media_mime_type(track)
291  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(track)
292 
293  return BrowseMediaSource(
294  domain=DOMAIN,
295  identifier=track_id,
296  media_class=MediaClass.TRACK,
297  media_content_type=mime_type,
298  title=track_title,
299  can_play=True,
300  can_expand=False,
301  thumbnail=thumbnail_url,
302  )
303 
305  self, library: dict[str, Any], include_children: bool
306  ) -> BrowseMediaSource:
307  """Return a single movie library as a browsable media source."""
308  library_id = library[ITEM_KEY_ID]
309  library_name = library[ITEM_KEY_NAME]
310 
311  result = BrowseMediaSource(
312  domain=DOMAIN,
313  identifier=library_id,
314  media_class=MediaClass.DIRECTORY,
315  media_content_type=MEDIA_TYPE_NONE,
316  title=library_name,
317  can_play=False,
318  can_expand=True,
319  )
320 
321  if include_children:
322  result.children_media_class = MediaClass.MOVIE
323  result.children = await self._build_movies_build_movies(library_id)
324 
325  return result
326 
327  async def _build_movies(self, library_id: str) -> list[BrowseMediaSource]:
328  """Return all movies in the movie library."""
329  movies = await self._get_children_get_children(library_id, ITEM_TYPE_MOVIE)
330  movies = sorted(
331  movies,
332  # Sort by whether a movies has an name first, then by name
333  # This allows for sorting moveis with, without and with missing names
334  key=lambda k: (
335  ITEM_KEY_NAME not in k,
336  k.get(ITEM_KEY_NAME),
337  ),
338  )
339  return [
340  self._build_movie_build_movie(movie)
341  for movie in movies
342  if _media_mime_type(movie) is not None
343  ]
344 
345  def _build_movie(self, movie: dict[str, Any]) -> BrowseMediaSource:
346  """Return a single movie as a browsable media source."""
347  movie_id = movie[ITEM_KEY_ID]
348  movie_title = movie[ITEM_KEY_NAME]
349  mime_type = _media_mime_type(movie)
350  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(movie)
351 
352  return BrowseMediaSource(
353  domain=DOMAIN,
354  identifier=movie_id,
355  media_class=MediaClass.MOVIE,
356  media_content_type=mime_type,
357  title=movie_title,
358  can_play=True,
359  can_expand=False,
360  thumbnail=thumbnail_url,
361  )
362 
363  async def _build_tv_library(
364  self, library: dict[str, Any], include_children: bool
365  ) -> BrowseMediaSource:
366  """Return a single tv show library as a browsable media source."""
367  library_id = library[ITEM_KEY_ID]
368  library_name = library[ITEM_KEY_NAME]
369 
370  result = BrowseMediaSource(
371  domain=DOMAIN,
372  identifier=library_id,
373  media_class=MediaClass.DIRECTORY,
374  media_content_type=MEDIA_TYPE_NONE,
375  title=library_name,
376  can_play=False,
377  can_expand=True,
378  )
379 
380  if include_children:
381  result.children_media_class = MediaClass.TV_SHOW
382  result.children = await self._build_tvshow_build_tvshow(library_id)
383 
384  return result
385 
386  async def _build_tvshow(self, library_id: str) -> list[BrowseMediaSource]:
387  """Return all series in the tv library."""
388  series = await self._get_children_get_children(library_id, ITEM_TYPE_SERIES)
389  series = sorted(
390  series,
391  # Sort by whether a seroes has an name first, then by name
392  # This allows for sorting series with, without and with missing names
393  key=lambda k: (
394  ITEM_KEY_NAME not in k,
395  k.get(ITEM_KEY_NAME),
396  ),
397  )
398  return [await self._build_series_build_series(s, False) for s in series]
399 
400  async def _build_series(
401  self, series: dict[str, Any], include_children: bool
402  ) -> BrowseMediaSource:
403  """Return a single series as a browsable media source."""
404  series_id = series[ITEM_KEY_ID]
405  series_title = series[ITEM_KEY_NAME]
406  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(series)
407 
408  result = BrowseMediaSource(
409  domain=DOMAIN,
410  identifier=series_id,
411  media_class=MediaClass.TV_SHOW,
412  media_content_type=MEDIA_TYPE_NONE,
413  title=series_title,
414  can_play=False,
415  can_expand=True,
416  thumbnail=thumbnail_url,
417  )
418 
419  if include_children:
420  result.children_media_class = MediaClass.SEASON
421  result.children = await self._build_seasons_build_seasons(series_id)
422 
423  return result
424 
425  async def _build_seasons(self, series_id: str) -> list[BrowseMediaSource]:
426  """Return all seasons in the series."""
427  seasons = await self._get_children_get_children(series_id, ITEM_TYPE_SEASON)
428  seasons = sorted(
429  seasons,
430  # Sort by whether a season has an index first, then by index
431  # This allows for sorting seasons with, without and with missing indices
432  key=lambda k: (
433  ITEM_KEY_INDEX_NUMBER not in k,
434  k.get(ITEM_KEY_INDEX_NUMBER),
435  ),
436  )
437  return [await self._build_season_build_season(season, False) for season in seasons]
438 
439  async def _build_season(
440  self, season: dict[str, Any], include_children: bool
441  ) -> BrowseMediaSource:
442  """Return a single series as a browsable media source."""
443  season_id = season[ITEM_KEY_ID]
444  season_title = season[ITEM_KEY_NAME]
445  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(season)
446 
447  result = BrowseMediaSource(
448  domain=DOMAIN,
449  identifier=season_id,
450  media_class=MediaClass.TV_SHOW,
451  media_content_type=MEDIA_TYPE_NONE,
452  title=season_title,
453  can_play=False,
454  can_expand=True,
455  thumbnail=thumbnail_url,
456  )
457 
458  if include_children:
459  result.children_media_class = MediaClass.EPISODE
460  result.children = await self._build_episodes_build_episodes(season_id)
461 
462  return result
463 
464  async def _build_episodes(self, season_id: str) -> list[BrowseMediaSource]:
465  """Return all episode in the season."""
466  episodes = await self._get_children_get_children(season_id, ITEM_TYPE_EPISODE)
467  episodes = sorted(
468  episodes,
469  # Sort by whether an episode has an index first, then by index
470  # This allows for sorting episodes with, without and with missing indices
471  key=lambda k: (
472  ITEM_KEY_INDEX_NUMBER not in k,
473  k.get(ITEM_KEY_INDEX_NUMBER),
474  ),
475  )
476  return [
477  self._build_episode_build_episode(episode)
478  for episode in episodes
479  if _media_mime_type(episode) is not None
480  ]
481 
482  def _build_episode(self, episode: dict[str, Any]) -> BrowseMediaSource:
483  """Return a single episode as a browsable media source."""
484  episode_id = episode[ITEM_KEY_ID]
485  episode_title = episode[ITEM_KEY_NAME]
486  mime_type = _media_mime_type(episode)
487  thumbnail_url = self._get_thumbnail_url_get_thumbnail_url(episode)
488 
489  return BrowseMediaSource(
490  domain=DOMAIN,
491  identifier=episode_id,
492  media_class=MediaClass.EPISODE,
493  media_content_type=mime_type,
494  title=episode_title,
495  can_play=True,
496  can_expand=False,
497  thumbnail=thumbnail_url,
498  )
499 
500  async def _get_children(
501  self, parent_id: str, item_type: str
502  ) -> list[dict[str, Any]]:
503  """Return all children for the parent_id whose item type is item_type."""
504  params = {
505  "Recursive": "true",
506  "ParentId": parent_id,
507  "IncludeItemTypes": item_type,
508  }
509  if item_type in PLAYABLE_ITEM_TYPES:
510  params["Fields"] = ITEM_KEY_MEDIA_SOURCES
511 
512  result = await self.hasshass.async_add_executor_job(self.apiapi.user_items, "", params)
513  return result["Items"] # type: ignore[no-any-return]
514 
515  def _get_thumbnail_url(self, media_item: dict[str, Any]) -> str | None:
516  """Return the URL for the primary image of a media item if available."""
517  image_tags = media_item[ITEM_KEY_IMAGE_TAGS]
518 
519  if "Primary" not in image_tags:
520  return None
521 
522  item_id = media_item[ITEM_KEY_ID]
523  return str(self.apiapi.artwork(item_id, "Primary", MAX_IMAGE_WIDTH))
524 
525  def _get_stream_url(self, media_item: dict[str, Any]) -> str:
526  """Return the stream URL for a media item."""
527  media_type = media_item[ITEM_KEY_MEDIA_TYPE]
528  item_id = media_item[ITEM_KEY_ID]
529 
530  if media_type == MEDIA_TYPE_AUDIO:
531  if audio_codec := self.entryentry.options.get(CONF_AUDIO_CODEC):
532  return self.apiapi.audio_url(item_id, audio_codec=audio_codec) # type: ignore[no-any-return]
533  return self.apiapi.audio_url(item_id) # type: ignore[no-any-return]
534  if media_type == MEDIA_TYPE_VIDEO:
535  return self.apiapi.video_url(item_id) # type: ignore[no-any-return]
536 
537  raise BrowseError(f"Unsupported media type {media_type}")
538 
539 
540 def _media_mime_type(media_item: dict[str, Any]) -> str | None:
541  """Return the mime type of a media item."""
542  if not media_item.get(ITEM_KEY_MEDIA_SOURCES):
543  _LOGGER.debug("Unable to determine mime type for item without media source")
544  return None
545 
546  media_source = media_item[ITEM_KEY_MEDIA_SOURCES][0]
547 
548  if MEDIA_SOURCE_KEY_PATH not in media_source:
549  _LOGGER.debug("Unable to determine mime type for media source without path")
550  return None
551 
552  path = media_source[MEDIA_SOURCE_KEY_PATH]
553  mime_type, _ = mimetypes.guess_type(path)
554 
555  if mime_type is None:
556  _LOGGER.debug(
557  "Unable to determine mime type for path %s", os.path.basename(path)
558  )
559 
560  return mime_type
BrowseMediaSource _build_album(self, dict[str, Any] album, bool include_children)
BrowseMediaSource _build_library(self, dict[str, Any] library, bool include_children)
list[BrowseMediaSource] _build_tracks(self, str album_id)
list[BrowseMediaSource] _build_movies(self, str library_id)
BrowseMediaSource _build_track(self, dict[str, Any] track)
list[BrowseMediaSource] _build_episodes(self, str season_id)
PlayMedia async_resolve_media(self, MediaSourceItem item)
Definition: media_source.py:82
None __init__(self, HomeAssistant hass, JellyfinClient client, JellyfinConfigEntry entry)
Definition: media_source.py:71
BrowseMediaSource _build_artist(self, dict[str, Any] artist, bool include_children)
list[BrowseMediaSource] _build_albums(self, str parent_id)
list[BrowseMediaSource] _build_artists(self, str library_id)
list[dict[str, Any]] _get_children(self, str parent_id, str item_type)
BrowseMediaSource _build_tv_library(self, dict[str, Any] library, bool include_children)
list[BrowseMediaSource] _build_seasons(self, str series_id)
BrowseMediaSource _build_movie(self, dict[str, Any] movie)
str|None _get_thumbnail_url(self, dict[str, Any] media_item)
str _get_stream_url(self, dict[str, Any] media_item)
BrowseMediaSource async_browse_media(self, MediaSourceItem item)
Definition: media_source.py:96
BrowseMediaSource _build_music_library(self, dict[str, Any] library, bool include_children)
BrowseMediaSource _build_season(self, dict[str, Any] season, bool include_children)
BrowseMediaSource _build_series(self, dict[str, Any] series, bool include_children)
BrowseMediaSource _build_movie_library(self, dict[str, Any] library, bool include_children)
BrowseMediaSource _build_episode(self, dict[str, Any] episode)
list[BrowseMediaSource] _build_tvshow(self, str library_id)
MediaSource async_get_media_source(HomeAssistant hass)
Definition: media_source.py:55
str|None _media_mime_type(dict[str, Any] media_item)