Home Assistant Unofficial Reference 2024.12.1
media_player.py
Go to the documentation of this file.
1 """Support for interacting with Spotify Connect."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Awaitable, Callable, Coroutine
7 import datetime as dt
8 import logging
9 from typing import TYPE_CHECKING, Any, Concatenate
10 
11 from spotifyaio import (
12  Device,
13  Episode,
14  Item,
15  ItemType,
16  PlaybackState,
17  ProductType,
18  RepeatMode as SpotifyRepeatMode,
19  Track,
20 )
21 from yarl import URL
22 
24  ATTR_MEDIA_ENQUEUE,
25  BrowseMedia,
26  MediaPlayerEnqueue,
27  MediaPlayerEntity,
28  MediaPlayerEntityFeature,
29  MediaPlayerState,
30  MediaType,
31  RepeatMode,
32 )
33 from homeassistant.core import HomeAssistant, callback
34 from homeassistant.helpers.entity_platform import AddEntitiesCallback
35 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
36 
37 from .browse_media import async_browse_media_internal
38 from .const import MEDIA_PLAYER_PREFIX, PLAYABLE_MEDIA_TYPES
39 from .coordinator import SpotifyConfigEntry, SpotifyCoordinator
40 from .entity import SpotifyEntity
41 
42 _LOGGER = logging.getLogger(__name__)
43 
44 SUPPORT_SPOTIFY = (
45  MediaPlayerEntityFeature.BROWSE_MEDIA
46  | MediaPlayerEntityFeature.NEXT_TRACK
47  | MediaPlayerEntityFeature.PAUSE
48  | MediaPlayerEntityFeature.PLAY
49  | MediaPlayerEntityFeature.PLAY_MEDIA
50  | MediaPlayerEntityFeature.PREVIOUS_TRACK
51  | MediaPlayerEntityFeature.REPEAT_SET
52  | MediaPlayerEntityFeature.SEEK
53  | MediaPlayerEntityFeature.SELECT_SOURCE
54  | MediaPlayerEntityFeature.SHUFFLE_SET
55  | MediaPlayerEntityFeature.VOLUME_SET
56 )
57 
58 REPEAT_MODE_MAPPING_TO_HA = {
59  SpotifyRepeatMode.CONTEXT: RepeatMode.ALL,
60  SpotifyRepeatMode.OFF: RepeatMode.OFF,
61  SpotifyRepeatMode.TRACK: RepeatMode.ONE,
62 }
63 
64 REPEAT_MODE_MAPPING_TO_SPOTIFY = {
65  value: key for key, value in REPEAT_MODE_MAPPING_TO_HA.items()
66 }
67 AFTER_REQUEST_SLEEP = 1
68 
69 
71  hass: HomeAssistant,
72  entry: SpotifyConfigEntry,
73  async_add_entities: AddEntitiesCallback,
74 ) -> None:
75  """Set up Spotify based on a config entry."""
76  data = entry.runtime_data
77  assert entry.unique_id is not None
78  spotify = SpotifyMediaPlayer(
79  data.coordinator,
80  data.devices,
81  )
82  async_add_entities([spotify])
83 
84 
85 def ensure_item[_R](
86  func: Callable[[SpotifyMediaPlayer, Item], _R],
87 ) -> Callable[[SpotifyMediaPlayer], _R | None]:
88  """Ensure that the currently playing item is available."""
89 
90  def wrapper(self: SpotifyMediaPlayer) -> _R | None:
91  if not self.currently_playing or not self.currently_playing.item:
92  return None
93  return func(self, self.currently_playing.item)
94 
95  return wrapper
96 
97 
98 def async_refresh_after[_T: SpotifyEntity, **_P](
99  func: Callable[Concatenate[_T, _P], Awaitable[None]],
100 ) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
101  """Define a wrapper to yield and refresh after."""
102 
103  async def _async_wrap(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
104  await func(self, *args, **kwargs)
105  await asyncio.sleep(AFTER_REQUEST_SLEEP)
106  await self.coordinator.async_refresh()
107 
108  return _async_wrap
109 
110 
112  """Representation of a Spotify controller."""
113 
114  _attr_media_image_remotely_accessible = False
115  _attr_name = None
116  _attr_translation_key = "spotify"
117 
118  def __init__(
119  self,
120  coordinator: SpotifyCoordinator,
121  device_coordinator: DataUpdateCoordinator[list[Device]],
122  ) -> None:
123  """Initialize."""
124  super().__init__(coordinator)
125  self.devicesdevices = device_coordinator
126  self._attr_unique_id_attr_unique_id = coordinator.current_user.user_id
127 
128  @property
129  def currently_playing(self) -> PlaybackState | None:
130  """Return the current playback."""
131  return self.coordinator.data.current_playback
132 
133  @property
134  def supported_features(self) -> MediaPlayerEntityFeature:
135  """Return the supported features."""
136  if self.coordinator.current_user.product != ProductType.PREMIUM:
137  return MediaPlayerEntityFeature(0)
138  if not self.currently_playingcurrently_playing or self.currently_playingcurrently_playing.device.is_restricted:
139  return MediaPlayerEntityFeature.SELECT_SOURCE
140  return SUPPORT_SPOTIFY
141 
142  @property
143  def state(self) -> MediaPlayerState:
144  """Return the playback state."""
145  if not self.currently_playingcurrently_playing:
146  return MediaPlayerState.IDLE
147  if self.currently_playingcurrently_playing.is_playing:
148  return MediaPlayerState.PLAYING
149  return MediaPlayerState.PAUSED
150 
151  @property
152  def volume_level(self) -> float | None:
153  """Return the device volume."""
154  if not self.currently_playingcurrently_playing:
155  return None
156  return self.currently_playingcurrently_playing.device.volume_percent / 100
157 
158  @property
159  @ensure_item
160  def media_content_id(self, item: Item) -> str: # noqa: PLR0206
161  """Return the media URL."""
162  return item.uri
163 
164  @property
165  @ensure_item
166  def media_content_type(self, item: Item) -> str: # noqa: PLR0206
167  """Return the media type."""
168  return MediaType.PODCAST if item.type == ItemType.EPISODE else MediaType.MUSIC
169 
170  @property
171  @ensure_item
172  def media_duration(self, item: Item) -> int: # noqa: PLR0206
173  """Duration of current playing media in seconds."""
174  return round(item.duration_ms / 1000)
175 
176  @property
177  def media_position(self) -> int | None:
178  """Position of current playing media in seconds."""
179  if not self.currently_playingcurrently_playing or self.currently_playingcurrently_playing.progress_ms is None:
180  return None
181  return round(self.currently_playingcurrently_playing.progress_ms / 1000)
182 
183  @property
184  def media_position_updated_at(self) -> dt.datetime | None:
185  """When was the position of the current playing media valid."""
186  if not self.currently_playingcurrently_playing:
187  return None
188  return self.coordinator.data.position_updated_at
189 
190  @property
191  @ensure_item
192  def media_image_url(self, item: Item) -> str | None: # noqa: PLR0206
193  """Return the media image URL."""
194  if item.type == ItemType.EPISODE:
195  if TYPE_CHECKING:
196  assert isinstance(item, Episode)
197  if item.images:
198  return item.images[0].url
199  if item.show and item.show.images:
200  return item.show.images[0].url
201  return None
202  if TYPE_CHECKING:
203  assert isinstance(item, Track)
204  if not item.album.images:
205  return None
206  return item.album.images[0].url
207 
208  @property
209  @ensure_item
210  def media_title(self, item: Item) -> str: # noqa: PLR0206
211  """Return the media title."""
212  return item.name
213 
214  @property
215  @ensure_item
216  def media_artist(self, item: Item) -> str: # noqa: PLR0206
217  """Return the media artist."""
218  if item.type == ItemType.EPISODE:
219  if TYPE_CHECKING:
220  assert isinstance(item, Episode)
221  return item.show.publisher
222 
223  if TYPE_CHECKING:
224  assert isinstance(item, Track)
225  return ", ".join(artist.name for artist in item.artists)
226 
227  @property
228  @ensure_item
229  def media_album_name(self, item: Item) -> str: # noqa: PLR0206
230  """Return the media album."""
231  if item.type == ItemType.EPISODE:
232  if TYPE_CHECKING:
233  assert isinstance(item, Episode)
234  return item.show.name
235 
236  if TYPE_CHECKING:
237  assert isinstance(item, Track)
238  return item.album.name
239 
240  @property
241  @ensure_item
242  def media_track(self, item: Item) -> int | None: # noqa: PLR0206
243  """Track number of current playing media, music track only."""
244  if item.type == ItemType.EPISODE:
245  return None
246  if TYPE_CHECKING:
247  assert isinstance(item, Track)
248  return item.track_number
249 
250  @property
251  def media_playlist(self) -> str | None:
252  """Title of Playlist currently playing."""
253  if self.coordinator.data.dj_playlist:
254  return "DJ"
255  if self.coordinator.data.playlist is None:
256  return None
257  return self.coordinator.data.playlist.name
258 
259  @property
260  def source(self) -> str | None:
261  """Return the current playback device."""
262  if not self.currently_playingcurrently_playing:
263  return None
264  return self.currently_playingcurrently_playing.device.name
265 
266  @property
267  def source_list(self) -> list[str] | None:
268  """Return a list of source devices."""
269  return [device.name for device in self.devicesdevices.data]
270 
271  @property
272  def shuffle(self) -> bool | None:
273  """Shuffling state."""
274  if not self.currently_playingcurrently_playing:
275  return None
276  return self.currently_playingcurrently_playing.shuffle
277 
278  @property
279  def repeat(self) -> RepeatMode | None:
280  """Return current repeat mode."""
281  if not self.currently_playingcurrently_playing:
282  return None
283  return REPEAT_MODE_MAPPING_TO_HA.get(self.currently_playingcurrently_playing.repeat_mode)
284 
285  @async_refresh_after
286  async def async_set_volume_level(self, volume: float) -> None:
287  """Set the volume level."""
288  await self.coordinator.client.set_volume(int(volume * 100))
289 
290  @async_refresh_after
291  async def async_media_play(self) -> None:
292  """Start or resume playback."""
293  await self.coordinator.client.start_playback()
294 
295  @async_refresh_after
296  async def async_media_pause(self) -> None:
297  """Pause playback."""
298  await self.coordinator.client.pause_playback()
299 
300  @async_refresh_after
301  async def async_media_previous_track(self) -> None:
302  """Skip to previous track."""
303  await self.coordinator.client.previous_track()
304 
305  @async_refresh_after
306  async def async_media_next_track(self) -> None:
307  """Skip to next track."""
308  await self.coordinator.client.next_track()
309 
310  @async_refresh_after
311  async def async_media_seek(self, position: float) -> None:
312  """Send seek command."""
313  await self.coordinator.client.seek_track(int(position * 1000))
314 
315  @async_refresh_after
316  async def async_play_media(
317  self, media_type: MediaType | str, media_id: str, **kwargs: Any
318  ) -> None:
319  """Play media."""
320  media_type = media_type.removeprefix(MEDIA_PLAYER_PREFIX)
321 
322  enqueue: MediaPlayerEnqueue = kwargs.get(
323  ATTR_MEDIA_ENQUEUE, MediaPlayerEnqueue.REPLACE
324  )
325 
326  kwargs = {}
327 
328  # Spotify can't handle URI's with query strings or anchors
329  # Yet, they do generate those types of URI in their official clients.
330  media_id = str(URL(media_id).with_query(None).with_fragment(None))
331 
332  if media_type in {MediaType.TRACK, MediaType.EPISODE, MediaType.MUSIC}:
333  kwargs["uris"] = [media_id]
334  elif media_type in PLAYABLE_MEDIA_TYPES:
335  kwargs["context_uri"] = media_id
336  else:
337  _LOGGER.error("Media type %s is not supported", media_type)
338  return
339 
340  if not self.currently_playingcurrently_playing and self.devicesdevices.data:
341  kwargs["device_id"] = self.devicesdevices.data[0].device_id
342 
343  if enqueue == MediaPlayerEnqueue.ADD:
344  if media_type not in {
345  MediaType.TRACK,
346  MediaType.EPISODE,
347  MediaType.MUSIC,
348  }:
349  raise ValueError(
350  f"Media type {media_type} is not supported when enqueue is ADD"
351  )
352  await self.coordinator.client.add_to_queue(
353  media_id, kwargs.get("device_id")
354  )
355  return
356 
357  await self.coordinator.client.start_playback(**kwargs)
358 
359  @async_refresh_after
360  async def async_select_source(self, source: str) -> None:
361  """Select playback device."""
362  for device in self.devicesdevices.data:
363  if device.name == source:
364  if TYPE_CHECKING:
365  assert device.device_id is not None
366  await self.coordinator.client.transfer_playback(device.device_id)
367  return
368 
369  @async_refresh_after
370  async def async_set_shuffle(self, shuffle: bool) -> None:
371  """Enable/Disable shuffle mode."""
372  await self.coordinator.client.set_shuffle(state=shuffle)
373 
374  @async_refresh_after
375  async def async_set_repeat(self, repeat: RepeatMode) -> None:
376  """Set repeat mode."""
377  if repeat not in REPEAT_MODE_MAPPING_TO_SPOTIFY:
378  raise ValueError(f"Unsupported repeat mode: {repeat}")
379  await self.coordinator.client.set_repeat(REPEAT_MODE_MAPPING_TO_SPOTIFY[repeat])
380 
382  self,
383  media_content_type: MediaType | str | None = None,
384  media_content_id: str | None = None,
385  ) -> BrowseMedia:
386  """Implement the websocket media browsing helper."""
387 
388  return await async_browse_media_internal(
389  self.hasshasshass,
390  self.coordinator.client,
391  media_content_type,
392  media_content_id,
393  )
394 
395  @callback
396  def _handle_devices_update(self) -> None:
397  """Handle updated data from the coordinator."""
398  if not self.enabledenabled:
399  return
400  self.async_write_ha_stateasync_write_ha_state()
401 
402  async def async_added_to_hass(self) -> None:
403  """When entity is added to hass."""
404  await super().async_added_to_hass()
405  self.async_on_removeasync_on_remove(
406  self.devicesdevices.async_add_listener(self._handle_devices_update_handle_devices_update)
407  )
BrowseMedia async_browse_media(self, MediaType|str|None media_content_type=None, str|None media_content_id=None)
None __init__(self, SpotifyCoordinator coordinator, DataUpdateCoordinator[list[Device]] device_coordinator)
None async_play_media(self, MediaType|str media_type, str media_id, **Any kwargs)
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
BrowseMedia async_browse_media_internal(HomeAssistant hass, SpotifyClient spotify, str|None media_content_type, str|None media_content_id, *bool can_play_artist=True)
None async_setup_entry(HomeAssistant hass, SpotifyConfigEntry entry, AddEntitiesCallback async_add_entities)
Definition: media_player.py:74