Home Assistant Unofficial Reference 2024.12.1
media_browser.py
Go to the documentation of this file.
1 """Support for media browsing."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from contextlib import suppress
7 from functools import partial
8 import logging
9 from typing import cast
10 import urllib.parse
11 
12 from soco.data_structures import DidlObject
13 from soco.ms_data_structures import MusicServiceItem
14 from soco.music_library import MusicLibrary
15 
16 from homeassistant.components import media_source, plex, spotify
18  BrowseError,
19  BrowseMedia,
20  MediaClass,
21  MediaType,
22 )
23 from homeassistant.core import HomeAssistant
24 from homeassistant.helpers.network import is_internal_request
25 
26 from .const import (
27  DOMAIN,
28  EXPANDABLE_MEDIA_TYPES,
29  LIBRARY_TITLES_MAPPING,
30  MEDIA_TYPES_TO_SONOS,
31  PLAYABLE_MEDIA_TYPES,
32  SONOS_ALBUM,
33  SONOS_ALBUM_ARTIST,
34  SONOS_GENRE,
35  SONOS_TO_MEDIA_CLASSES,
36  SONOS_TO_MEDIA_TYPES,
37  SONOS_TRACKS,
38  SONOS_TYPES_MAPPING,
39 )
40 from .exception import UnknownMediaType
41 from .favorites import SonosFavorites
42 from .speaker import SonosMedia, SonosSpeaker
43 
44 _LOGGER = logging.getLogger(__name__)
45 
46 type GetBrowseImageUrlType = Callable[[str, str, str | None], str]
47 
48 
49 def fix_image_url(url: str) -> str:
50  """Update the image url to fully encode characters to allow image display in media_browser UI.
51 
52  Images whose file path contains characters such as ',()+ are not loaded without escaping them.
53  """
54 
55  # Before parsing encode the plus sign; otherwise it'll be interpreted as a space.
56  original_url: str = urllib.parse.unquote(url).replace("+", "%2B")
57  parsed_url = urllib.parse.urlparse(original_url)
58  query_params = urllib.parse.parse_qsl(parsed_url.query)
59  new_url = urllib.parse.urlunsplit(
60  (
61  parsed_url.scheme,
62  parsed_url.netloc,
63  parsed_url.path,
64  urllib.parse.urlencode(
65  query_params, quote_via=urllib.parse.quote, safe="/:"
66  ),
67  "",
68  )
69  )
70  if original_url != new_url:
71  _LOGGER.debug("fix_sonos_image_url original: %s new: %s", original_url, new_url)
72  return new_url
73 
74 
76  media: SonosMedia,
77  is_internal: bool,
78  get_browse_image_url: GetBrowseImageUrlType,
79  media_content_type: str,
80  media_content_id: str,
81  media_image_id: str | None = None,
82  item: MusicServiceItem | None = None,
83 ) -> str | None:
84  """Get thumbnail URL."""
85  if is_internal:
86  if not item:
87  item = get_media(
88  media.library,
89  media_content_id,
90  media_content_type,
91  )
92  return fix_image_url(getattr(item, "album_art_uri", ""))
93 
94  return urllib.parse.unquote(
95  get_browse_image_url(
96  media_content_type,
97  media_content_id,
98  media_image_id,
99  )
100  )
101 
102 
103 def media_source_filter(item: BrowseMedia) -> bool:
104  """Filter media sources."""
105  return item.media_content_type.startswith("audio/")
106 
107 
109  hass: HomeAssistant,
110  speaker: SonosSpeaker,
111  media: SonosMedia,
112  get_browse_image_url: GetBrowseImageUrlType,
113  media_content_id: str | None,
114  media_content_type: str | None,
115 ) -> BrowseMedia:
116  """Browse media."""
117 
118  if media_content_id is None:
119  return await root_payload(
120  hass,
121  speaker,
122  media,
123  get_browse_image_url,
124  )
125  assert media_content_type is not None
126 
127  if media_source.is_media_source_id(media_content_id):
128  return await media_source.async_browse_media(
129  hass, media_content_id, content_filter=media_source_filter
130  )
131 
132  if plex.is_plex_media_id(media_content_id):
133  return await plex.async_browse_media(
134  hass, media_content_type, media_content_id, platform=DOMAIN
135  )
136 
137  if media_content_type == "plex":
138  return await plex.async_browse_media(hass, None, None, platform=DOMAIN)
139 
140  if spotify.is_spotify_media_type(media_content_type):
141  return await spotify.async_browse_media(
142  hass, media_content_type, media_content_id, can_play_artist=False
143  )
144 
145  if media_content_type == "library":
146  return await hass.async_add_executor_job(
147  library_payload,
148  media.library,
149  partial(
150  get_thumbnail_url_full,
151  media,
152  is_internal_request(hass),
153  get_browse_image_url,
154  ),
155  )
156 
157  if media_content_type == "favorites":
158  return await hass.async_add_executor_job(
159  favorites_payload,
160  speaker.favorites,
161  )
162 
163  if media_content_type == "favorites_folder":
164  return await hass.async_add_executor_job(
165  favorites_folder_payload,
166  speaker.favorites,
167  media_content_id,
168  )
169 
170  payload = {
171  "search_type": media_content_type,
172  "idstring": media_content_id,
173  }
174  response = await hass.async_add_executor_job(
175  build_item_response,
176  media.library,
177  payload,
178  partial(
179  get_thumbnail_url_full,
180  media,
181  is_internal_request(hass),
182  get_browse_image_url,
183  ),
184  )
185  if response is None:
186  raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
187  return response
188 
189 
191  media_library: MusicLibrary, payload: dict[str, str], get_thumbnail_url=None
192 ) -> BrowseMedia | None:
193  """Create response payload for the provided media query."""
194  if payload["search_type"] == MediaType.ALBUM and payload["idstring"].startswith(
195  ("A:GENRE", "A:COMPOSER")
196  ):
197  payload["idstring"] = "A:ALBUMARTIST/" + "/".join(
198  payload["idstring"].split("/")[2:]
199  )
200  payload["idstring"] = urllib.parse.unquote(payload["idstring"])
201 
202  try:
203  search_type = MEDIA_TYPES_TO_SONOS[payload["search_type"]]
204  except KeyError:
205  _LOGGER.debug(
206  "Unknown media type received when building item response: %s",
207  payload["search_type"],
208  )
209  return None
210 
211  media = media_library.browse_by_idstring(
212  search_type,
213  payload["idstring"],
214  full_album_art_uri=True,
215  max_items=0,
216  )
217 
218  if media is None:
219  return None
220 
221  thumbnail = None
222  title = None
223 
224  # Fetch album info for titles and thumbnails
225  # Can't be extracted from track info
226  if (
227  payload["search_type"] == MediaType.ALBUM
228  and media[0].item_class == "object.item.audioItem.musicTrack"
229  ):
230  idstring = payload["idstring"]
231  if idstring.startswith("A:ALBUMARTIST/"):
232  search_type = SONOS_ALBUM_ARTIST
233  elif idstring.startswith("A:ALBUM/"):
234  search_type = SONOS_ALBUM
235  item = get_media(media_library, idstring, search_type)
236 
237  title = getattr(item, "title", None)
238  thumbnail = get_thumbnail_url(search_type, payload["idstring"])
239 
240  if not title:
241  try:
242  title = urllib.parse.unquote(payload["idstring"].split("/")[1])
243  except IndexError:
244  title = LIBRARY_TITLES_MAPPING[payload["idstring"]]
245 
246  try:
247  media_class = SONOS_TO_MEDIA_CLASSES[
248  MEDIA_TYPES_TO_SONOS[payload["search_type"]]
249  ]
250  except KeyError:
251  _LOGGER.debug("Unknown media type received %s", payload["search_type"])
252  return None
253 
254  children = []
255  for item in media:
256  with suppress(UnknownMediaType):
257  children.append(item_payload(item, get_thumbnail_url))
258 
259  return BrowseMedia(
260  title=title,
261  thumbnail=thumbnail,
262  media_class=media_class,
263  media_content_id=payload["idstring"],
264  media_content_type=payload["search_type"],
265  children=children,
266  can_play=can_play(payload["search_type"]),
267  can_expand=can_expand(payload["search_type"]),
268  )
269 
270 
271 def item_payload(item: DidlObject, get_thumbnail_url=None) -> BrowseMedia:
272  """Create response payload for a single media item.
273 
274  Used by async_browse_media.
275  """
276  media_type = get_media_type(item)
277  try:
278  media_class = SONOS_TO_MEDIA_CLASSES[media_type]
279  except KeyError as err:
280  _LOGGER.debug("Unknown media type received %s", media_type)
281  raise UnknownMediaType from err
282 
283  content_id = get_content_id(item)
284  thumbnail = None
285  if getattr(item, "album_art_uri", None):
286  thumbnail = get_thumbnail_url(media_class, content_id, item=item)
287 
288  return BrowseMedia(
289  title=item.title,
290  thumbnail=thumbnail,
291  media_class=media_class,
292  media_content_id=content_id,
293  media_content_type=SONOS_TO_MEDIA_TYPES[media_type],
294  can_play=can_play(item.item_class),
295  can_expand=can_expand(item),
296  )
297 
298 
299 async def root_payload(
300  hass: HomeAssistant,
301  speaker: SonosSpeaker,
302  media: SonosMedia,
303  get_browse_image_url: GetBrowseImageUrlType,
304 ) -> BrowseMedia:
305  """Return root payload for Sonos."""
306  children: list[BrowseMedia] = []
307 
308  if speaker.favorites:
309  children.append(
310  BrowseMedia(
311  title="Favorites",
312  media_class=MediaClass.DIRECTORY,
313  media_content_id="",
314  media_content_type="favorites",
315  thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
316  can_play=False,
317  can_expand=True,
318  )
319  )
320 
321  if await hass.async_add_executor_job(
322  partial(media.library.browse_by_idstring, "tracks", "", max_items=1)
323  ):
324  children.append(
325  BrowseMedia(
326  title="Music Library",
327  media_class=MediaClass.DIRECTORY,
328  media_content_id="",
329  media_content_type="library",
330  thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
331  can_play=False,
332  can_expand=True,
333  )
334  )
335 
336  if "plex" in hass.config.components:
337  children.append(
338  BrowseMedia(
339  title="Plex",
340  media_class=MediaClass.APP,
341  media_content_id="",
342  media_content_type="plex",
343  thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
344  can_play=False,
345  can_expand=True,
346  )
347  )
348 
349  if "spotify" in hass.config.components:
350  result = await spotify.async_browse_media(hass, None, None)
351  if result.children:
352  children.extend(result.children)
353 
354  try:
355  item = await media_source.async_browse_media(
356  hass, None, content_filter=media_source_filter
357  )
358  # If domain is None, it's overview of available sources
359  if item.domain is None and item.children is not None:
360  children.extend(item.children)
361  else:
362  children.append(item)
364  pass
365 
366  if len(children) == 1:
367  return await async_browse_media(
368  hass,
369  speaker,
370  media,
371  get_browse_image_url,
372  children[0].media_content_id,
373  children[0].media_content_type,
374  )
375 
376  return BrowseMedia(
377  title="Sonos",
378  media_class=MediaClass.DIRECTORY,
379  media_content_id="",
380  media_content_type="root",
381  can_play=False,
382  can_expand=True,
383  children=children,
384  )
385 
386 
387 def library_payload(media_library: MusicLibrary, get_thumbnail_url=None) -> BrowseMedia:
388  """Create response payload to describe contents of a specific library.
389 
390  Used by async_browse_media.
391  """
392  children = []
393  for item in media_library.browse():
394  with suppress(UnknownMediaType):
395  children.append(item_payload(item, get_thumbnail_url))
396 
397  return BrowseMedia(
398  title="Music Library",
399  media_class=MediaClass.DIRECTORY,
400  media_content_id="library",
401  media_content_type="library",
402  can_play=False,
403  can_expand=True,
404  children=children,
405  )
406 
407 
408 def favorites_payload(favorites: SonosFavorites) -> BrowseMedia:
409  """Create response payload to describe contents of a specific library.
410 
411  Used by async_browse_media.
412  """
413  children: list[BrowseMedia] = []
414 
415  group_types: set[str] = {fav.reference.item_class for fav in favorites}
416  for group_type in sorted(group_types):
417  try:
418  media_content_type = SONOS_TYPES_MAPPING[group_type]
419  media_class = SONOS_TO_MEDIA_CLASSES[group_type]
420  except KeyError:
421  _LOGGER.debug("Unknown media type or class received %s", group_type)
422  continue
423  children.append(
424  BrowseMedia(
425  title=media_content_type.title(),
426  media_class=media_class,
427  media_content_id=group_type,
428  media_content_type="favorites_folder",
429  can_play=False,
430  can_expand=True,
431  )
432  )
433 
434  return BrowseMedia(
435  title="Favorites",
436  media_class=MediaClass.DIRECTORY,
437  media_content_id="",
438  media_content_type="favorites",
439  can_play=False,
440  can_expand=True,
441  children=children,
442  )
443 
444 
446  favorites: SonosFavorites, media_content_id: str
447 ) -> BrowseMedia:
448  """Create response payload to describe all items of a type of favorite.
449 
450  Used by async_browse_media.
451  """
452  children: list[BrowseMedia] = []
453  content_type = SONOS_TYPES_MAPPING[media_content_id]
454 
455  for favorite in favorites:
456  if favorite.reference.item_class != media_content_id:
457  continue
458  children.append(
459  BrowseMedia(
460  title=favorite.title,
461  media_class=SONOS_TO_MEDIA_CLASSES[favorite.reference.item_class],
462  media_content_id=favorite.item_id,
463  media_content_type="favorite_item_id",
464  can_play=True,
465  can_expand=False,
466  thumbnail=getattr(favorite, "album_art_uri", None),
467  )
468  )
469 
470  return BrowseMedia(
471  title=content_type.title(),
472  media_class=MediaClass.DIRECTORY,
473  media_content_id="",
474  media_content_type="favorites",
475  can_play=False,
476  can_expand=True,
477  children=children,
478  )
479 
480 
481 def get_media_type(item: DidlObject) -> str:
482  """Extract media type of item."""
483  if item.item_class == "object.item.audioItem.musicTrack":
484  return SONOS_TRACKS
485 
486  if (
487  item.item_class == "object.container.album.musicAlbum"
488  and SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0])
489  in [
490  SONOS_ALBUM_ARTIST,
491  SONOS_GENRE,
492  ]
493  ):
494  return SONOS_TYPES_MAPPING[item.item_class]
495 
496  return SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0], item.item_class)
497 
498 
499 def can_play(item: DidlObject) -> bool:
500  """Test if playable.
501 
502  Used by async_browse_media.
503  """
504  return SONOS_TO_MEDIA_TYPES.get(item) in PLAYABLE_MEDIA_TYPES
505 
506 
507 def can_expand(item: DidlObject) -> bool:
508  """Test if expandable.
509 
510  Used by async_browse_media.
511  """
512  if isinstance(item, str):
513  return SONOS_TYPES_MAPPING.get(item) in EXPANDABLE_MEDIA_TYPES
514 
515  if SONOS_TO_MEDIA_TYPES.get(item.item_class) in EXPANDABLE_MEDIA_TYPES:
516  return True
517 
518  return SONOS_TYPES_MAPPING.get(item.item_id) in EXPANDABLE_MEDIA_TYPES
519 
520 
521 def get_content_id(item: DidlObject) -> str:
522  """Extract content id or uri."""
523  if item.item_class == "object.item.audioItem.musicTrack":
524  return cast(str, item.get_uri())
525  return cast(str, item.item_id)
526 
527 
529  media_library: MusicLibrary, item_id: str, search_type: str
530 ) -> MusicServiceItem | None:
531  """Fetch a single media/album."""
532  _LOGGER.debug("get_media item_id [%s], search_type [%s]", item_id, search_type)
533  search_type = MEDIA_TYPES_TO_SONOS.get(search_type, search_type)
534 
535  if search_type == "playlists":
536  # Format is S:TITLE or S:ITEM_ID
537  splits = item_id.split(":")
538  title = splits[1] if len(splits) > 1 else None
539  return next(
540  (
541  p
542  for p in media_library.get_playlists()
543  if (item_id == p.item_id or title == p.title)
544  ),
545  None,
546  )
547 
548  if not item_id.startswith("A:ALBUM") and search_type == SONOS_ALBUM:
549  item_id = "A:ALBUMARTIST/" + "/".join(item_id.split("/")[2:])
550 
551  if item_id.startswith("A:ALBUM/") or search_type == "tracks":
552  search_term = urllib.parse.unquote(item_id.split("/")[-1])
553  matches = media_library.get_music_library_information(
554  search_type, search_term=search_term, full_album_art_uri=True
555  )
556  else:
557  # When requesting media by album_artist, composer, genre use the browse interface
558  # to navigate the hierarchy. This occurs when invoked from media browser or service
559  # calls
560  # Example: A:ALBUMARTIST/Neil Young/Greatest Hits - get specific album
561  # Example: A:ALBUMARTIST/Neil Young - get all albums
562  # Others: composer, genre
563  # A:<topic>/<name>/<optional title>
564  splits = item_id.split("/")
565  title = urllib.parse.unquote(splits[2]) if len(splits) > 2 else None
566  browse_id_string = splits[0] + "/" + splits[1]
567  matches = media_library.browse_by_idstring(
568  search_type, browse_id_string, full_album_art_uri=True
569  )
570  if title:
571  result = next(
572  (item for item in matches if (title == item.title)),
573  None,
574  )
575  matches = [result]
576 
577  _LOGGER.debug(
578  "get_media search_type [%s] item_id [%s] matches [%d]",
579  search_type,
580  item_id,
581  len(matches),
582  )
583  if len(matches) > 0:
584  return matches[0]
585  return None
BrowseMedia|None build_item_response(MusicLibrary media_library, dict[str, str] payload, get_thumbnail_url=None)
BrowseMedia favorites_payload(SonosFavorites favorites)
BrowseMedia async_browse_media(HomeAssistant hass, SonosSpeaker speaker, SonosMedia media, GetBrowseImageUrlType get_browse_image_url, str|None media_content_id, str|None media_content_type)
MusicServiceItem|None get_media(MusicLibrary media_library, str item_id, str search_type)
BrowseMedia favorites_folder_payload(SonosFavorites favorites, str media_content_id)
BrowseMedia item_payload(DidlObject item, get_thumbnail_url=None)
str|None get_thumbnail_url_full(SonosMedia media, bool is_internal, GetBrowseImageUrlType get_browse_image_url, str media_content_type, str media_content_id, str|None media_image_id=None, MusicServiceItem|None item=None)
BrowseMedia root_payload(HomeAssistant hass, SonosSpeaker speaker, SonosMedia media, GetBrowseImageUrlType get_browse_image_url)
BrowseMedia library_payload(MusicLibrary media_library, get_thumbnail_url=None)
bool is_internal_request(HomeAssistant hass)
Definition: network.py:31