Home Assistant Unofficial Reference 2024.12.1
media_player.py
Go to the documentation of this file.
1 """Denon HEOS Media Player."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Awaitable, Callable, Coroutine
6 from functools import reduce, wraps
7 import logging
8 from operator import ior
9 from typing import Any
10 
11 from pyheos import HeosError, const as heos_const
12 
13 from homeassistant.components import media_source
15  ATTR_MEDIA_ENQUEUE,
16  DOMAIN as MEDIA_PLAYER_DOMAIN,
17  BrowseMedia,
18  MediaPlayerEnqueue,
19  MediaPlayerEntity,
20  MediaPlayerEntityFeature,
21  MediaPlayerState,
22  MediaType,
23  async_process_play_media_url,
24 )
25 from homeassistant.config_entries import ConfigEntry
26 from homeassistant.core import HomeAssistant
27 from homeassistant.helpers.device_registry import DeviceInfo
29  async_dispatcher_connect,
30  async_dispatcher_send,
31 )
32 from homeassistant.helpers.entity_platform import AddEntitiesCallback
33 from homeassistant.util.dt import utcnow
34 
35 from .const import (
36  DATA_ENTITY_ID_MAP,
37  DATA_GROUP_MANAGER,
38  DATA_SOURCE_MANAGER,
39  DOMAIN as HEOS_DOMAIN,
40  SIGNAL_HEOS_PLAYER_ADDED,
41  SIGNAL_HEOS_UPDATED,
42 )
43 
44 BASE_SUPPORTED_FEATURES = (
45  MediaPlayerEntityFeature.VOLUME_MUTE
46  | MediaPlayerEntityFeature.VOLUME_SET
47  | MediaPlayerEntityFeature.VOLUME_STEP
48  | MediaPlayerEntityFeature.CLEAR_PLAYLIST
49  | MediaPlayerEntityFeature.SHUFFLE_SET
50  | MediaPlayerEntityFeature.SELECT_SOURCE
51  | MediaPlayerEntityFeature.PLAY_MEDIA
52  | MediaPlayerEntityFeature.GROUPING
53  | MediaPlayerEntityFeature.BROWSE_MEDIA
54  | MediaPlayerEntityFeature.MEDIA_ENQUEUE
55 )
56 
57 PLAY_STATE_TO_STATE = {
58  heos_const.PLAY_STATE_PLAY: MediaPlayerState.PLAYING,
59  heos_const.PLAY_STATE_STOP: MediaPlayerState.IDLE,
60  heos_const.PLAY_STATE_PAUSE: MediaPlayerState.PAUSED,
61 }
62 
63 CONTROL_TO_SUPPORT = {
64  heos_const.CONTROL_PLAY: MediaPlayerEntityFeature.PLAY,
65  heos_const.CONTROL_PAUSE: MediaPlayerEntityFeature.PAUSE,
66  heos_const.CONTROL_STOP: MediaPlayerEntityFeature.STOP,
67  heos_const.CONTROL_PLAY_PREVIOUS: MediaPlayerEntityFeature.PREVIOUS_TRACK,
68  heos_const.CONTROL_PLAY_NEXT: MediaPlayerEntityFeature.NEXT_TRACK,
69 }
70 
71 HA_HEOS_ENQUEUE_MAP = {
72  None: heos_const.ADD_QUEUE_REPLACE_AND_PLAY,
73  MediaPlayerEnqueue.ADD: heos_const.ADD_QUEUE_ADD_TO_END,
74  MediaPlayerEnqueue.REPLACE: heos_const.ADD_QUEUE_REPLACE_AND_PLAY,
75  MediaPlayerEnqueue.NEXT: heos_const.ADD_QUEUE_PLAY_NEXT,
76  MediaPlayerEnqueue.PLAY: heos_const.ADD_QUEUE_PLAY_NOW,
77 }
78 
79 _LOGGER = logging.getLogger(__name__)
80 
81 
83  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
84 ) -> None:
85  """Add media players for a config entry."""
86  players = hass.data[HEOS_DOMAIN][MEDIA_PLAYER_DOMAIN]
87  devices = [HeosMediaPlayer(player) for player in players.values()]
88  async_add_entities(devices, True)
89 
90 
91 type _FuncType[**_P] = Callable[_P, Awaitable[Any]]
92 type _ReturnFuncType[**_P] = Callable[_P, Coroutine[Any, Any, None]]
93 
94 
95 def log_command_error[**_P](
96  command: str,
97 ) -> Callable[[_FuncType[_P]], _ReturnFuncType[_P]]:
98  """Return decorator that logs command failure."""
99 
100  def decorator(func: _FuncType[_P]) -> _ReturnFuncType[_P]:
101  @wraps(func)
102  async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
103  try:
104  await func(*args, **kwargs)
105  except (HeosError, ValueError) as ex:
106  _LOGGER.error("Unable to %s: %s", command, ex)
107 
108  return wrapper
109 
110  return decorator
111 
112 
114  """The HEOS player."""
115 
116  _attr_media_content_type = MediaType.MUSIC
117  _attr_should_poll = False
118  _attr_supported_features = BASE_SUPPORTED_FEATURES
119  _attr_media_image_remotely_accessible = True
120  _attr_has_entity_name = True
121  _attr_name = None
122 
123  def __init__(self, player):
124  """Initialize."""
125  self._media_position_updated_at_media_position_updated_at = None
126  self._player_player = player
127  self._signals_signals = []
128  self._source_manager_source_manager = None
129  self._group_manager_group_manager = None
130  self._attr_unique_id_attr_unique_id = str(player.player_id)
131  self._attr_device_info_attr_device_info = DeviceInfo(
132  identifiers={(HEOS_DOMAIN, player.player_id)},
133  manufacturer="HEOS",
134  model=player.model,
135  name=player.name,
136  sw_version=player.version,
137  )
138 
139  async def _player_update(self, player_id, event):
140  """Handle player attribute updated."""
141  if self._player_player.player_id != player_id:
142  return
143  if event == heos_const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
144  self._media_position_updated_at_media_position_updated_at = utcnow()
145  await self.async_update_ha_stateasync_update_ha_state(True)
146 
147  async def _heos_updated(self) -> None:
148  """Handle sources changed."""
149  await self.async_update_ha_stateasync_update_ha_state(True)
150 
151  async def async_added_to_hass(self) -> None:
152  """Device added to hass."""
153  # Update state when attributes of the player change
154  self._signals_signals.append(
155  self._player_player.heos.dispatcher.connect(
156  heos_const.SIGNAL_PLAYER_EVENT, self._player_update_player_update
157  )
158  )
159  # Update state when heos changes
160  self._signals_signals.append(
161  async_dispatcher_connect(self.hasshass, SIGNAL_HEOS_UPDATED, self._heos_updated_heos_updated)
162  )
163  # Register this player's entity_id so it can be resolved by the group manager
164  self.hasshass.data[HEOS_DOMAIN][DATA_ENTITY_ID_MAP][self._player_player.player_id] = (
165  self.entity_identity_id
166  )
167  async_dispatcher_send(self.hasshass, SIGNAL_HEOS_PLAYER_ADDED)
168 
169  @log_command_error("clear playlist")
170  async def async_clear_playlist(self) -> None:
171  """Clear players playlist."""
172  await self._player_player.clear_queue()
173 
174  @log_command_error("join_players")
175  async def async_join_players(self, group_members: list[str]) -> None:
176  """Join `group_members` as a player group with the current player."""
177  await self._group_manager_group_manager.async_join_players(self.entity_identity_id, group_members)
178 
179  @log_command_error("pause")
180  async def async_media_pause(self) -> None:
181  """Send pause command."""
182  await self._player_player.pause()
183 
184  @log_command_error("play")
185  async def async_media_play(self) -> None:
186  """Send play command."""
187  await self._player_player.play()
188 
189  @log_command_error("move to previous track")
190  async def async_media_previous_track(self) -> None:
191  """Send previous track command."""
192  await self._player_player.play_previous()
193 
194  @log_command_error("move to next track")
195  async def async_media_next_track(self) -> None:
196  """Send next track command."""
197  await self._player_player.play_next()
198 
199  @log_command_error("stop")
200  async def async_media_stop(self) -> None:
201  """Send stop command."""
202  await self._player_player.stop()
203 
204  @log_command_error("set mute")
205  async def async_mute_volume(self, mute: bool) -> None:
206  """Mute the volume."""
207  await self._player_player.set_mute(mute)
208 
209  @log_command_error("play media")
210  async def async_play_media(
211  self, media_type: MediaType | str, media_id: str, **kwargs: Any
212  ) -> None:
213  """Play a piece of media."""
214  if media_source.is_media_source_id(media_id):
215  media_type = MediaType.URL
216  play_item = await media_source.async_resolve_media(
217  self.hasshass, media_id, self.entity_identity_id
218  )
219  media_id = play_item.url
220 
221  if media_type in {MediaType.URL, MediaType.MUSIC}:
222  media_id = async_process_play_media_url(self.hasshass, media_id)
223 
224  await self._player_player.play_url(media_id)
225  return
226 
227  if media_type == "quick_select":
228  # media_id may be an int or a str
229  selects = await self._player_player.get_quick_selects()
230  try:
231  index: int | None = int(media_id)
232  except ValueError:
233  # Try finding index by name
234  index = next(
235  (index for index, select in selects.items() if select == media_id),
236  None,
237  )
238  if index is None:
239  raise ValueError(f"Invalid quick select '{media_id}'")
240  await self._player_player.play_quick_select(index)
241  return
242 
243  if media_type == MediaType.PLAYLIST:
244  playlists = await self._player_player.heos.get_playlists()
245  playlist = next((p for p in playlists if p.name == media_id), None)
246  if not playlist:
247  raise ValueError(f"Invalid playlist '{media_id}'")
248  add_queue_option = HA_HEOS_ENQUEUE_MAP.get(kwargs.get(ATTR_MEDIA_ENQUEUE))
249 
250  await self._player_player.add_to_queue(playlist, add_queue_option)
251  return
252 
253  if media_type == "favorite":
254  # media_id may be an int or str
255  try:
256  index = int(media_id)
257  except ValueError:
258  # Try finding index by name
259  index = next(
260  (
261  index
262  for index, favorite in self._source_manager_source_manager.favorites.items()
263  if favorite.name == media_id
264  ),
265  None,
266  )
267  if index is None:
268  raise ValueError(f"Invalid favorite '{media_id}'")
269  await self._player_player.play_favorite(index)
270  return
271 
272  raise ValueError(f"Unsupported media type '{media_type}'")
273 
274  @log_command_error("select source")
275  async def async_select_source(self, source: str) -> None:
276  """Select input source."""
277  await self._source_manager_source_manager.play_source(source, self._player_player)
278 
279  @log_command_error("set shuffle")
280  async def async_set_shuffle(self, shuffle: bool) -> None:
281  """Enable/disable shuffle mode."""
282  await self._player_player.set_play_mode(self._player_player.repeat, shuffle)
283 
284  @log_command_error("set volume level")
285  async def async_set_volume_level(self, volume: float) -> None:
286  """Set volume level, range 0..1."""
287  await self._player_player.set_volume(int(volume * 100))
288 
289  async def async_update(self) -> None:
290  """Update supported features of the player."""
291  controls = self._player_player.now_playing_media.supported_controls
292  current_support = [CONTROL_TO_SUPPORT[control] for control in controls]
293  self._attr_supported_features_attr_supported_features = reduce(
294  ior, current_support, BASE_SUPPORTED_FEATURES
295  )
296 
297  if self._group_manager_group_manager is None:
298  self._group_manager_group_manager = self.hasshass.data[HEOS_DOMAIN][DATA_GROUP_MANAGER]
299 
300  if self._source_manager_source_manager is None:
301  self._source_manager_source_manager = self.hasshass.data[HEOS_DOMAIN][DATA_SOURCE_MANAGER]
302 
303  @log_command_error("unjoin_player")
304  async def async_unjoin_player(self) -> None:
305  """Remove this player from any group."""
306  await self._group_manager_group_manager.async_unjoin_player(self.entity_identity_id)
307 
308  async def async_will_remove_from_hass(self) -> None:
309  """Disconnect the device when removed."""
310  for signal_remove in self._signals_signals:
311  signal_remove()
312  self._signals_signals.clear()
313 
314  @property
315  def available(self) -> bool:
316  """Return True if the device is available."""
317  return self._player_player.available
318 
319  @property
320  def extra_state_attributes(self) -> dict[str, Any]:
321  """Get additional attribute about the state."""
322  return {
323  "media_album_id": self._player_player.now_playing_media.album_id,
324  "media_queue_id": self._player_player.now_playing_media.queue_id,
325  "media_source_id": self._player_player.now_playing_media.source_id,
326  "media_station": self._player_player.now_playing_media.station,
327  "media_type": self._player_player.now_playing_media.type,
328  }
329 
330  @property
331  def group_members(self) -> list[str]:
332  """List of players which are grouped together."""
333  return self._group_manager_group_manager.group_membership.get(self.entity_identity_id, [])
334 
335  @property
336  def is_volume_muted(self) -> bool:
337  """Boolean if volume is currently muted."""
338  return self._player_player.is_muted
339 
340  @property
341  def media_album_name(self) -> str:
342  """Album name of current playing media, music track only."""
343  return self._player_player.now_playing_media.album
344 
345  @property
346  def media_artist(self) -> str:
347  """Artist of current playing media, music track only."""
348  return self._player_player.now_playing_media.artist
349 
350  @property
351  def media_content_id(self) -> str:
352  """Content ID of current playing media."""
353  return self._player_player.now_playing_media.media_id
354 
355  @property
356  def media_duration(self):
357  """Duration of current playing media in seconds."""
358  duration = self._player_player.now_playing_media.duration
359  if isinstance(duration, int):
360  return duration / 1000
361  return None
362 
363  @property
364  def media_position(self):
365  """Position of current playing media in seconds."""
366  # Some media doesn't have duration but reports position, return None
367  if not self._player_player.now_playing_media.duration:
368  return None
369  return self._player_player.now_playing_media.current_position / 1000
370 
371  @property
373  """When was the position of the current playing media valid."""
374  # Some media doesn't have duration but reports position, return None
375  if not self._player_player.now_playing_media.duration:
376  return None
377  return self._media_position_updated_at_media_position_updated_at
378 
379  @property
380  def media_image_url(self) -> str | None:
381  """Image url of current playing media."""
382  # May be an empty string, if so, return None
383  image_url = self._player_player.now_playing_media.image_url
384  return image_url if image_url else None
385 
386  @property
387  def media_title(self) -> str:
388  """Title of current playing media."""
389  return self._player_player.now_playing_media.song
390 
391  @property
392  def shuffle(self) -> bool:
393  """Boolean if shuffle is enabled."""
394  return self._player_player.shuffle
395 
396  @property
397  def source(self) -> str:
398  """Name of the current input source."""
399  return self._source_manager_source_manager.get_current_source(self._player_player.now_playing_media)
400 
401  @property
402  def source_list(self) -> list[str]:
403  """List of available input sources."""
404  return self._source_manager_source_manager.source_list
405 
406  @property
407  def state(self) -> MediaPlayerState:
408  """State of the player."""
409  return PLAY_STATE_TO_STATE[self._player_player.state]
410 
411  @property
412  def volume_level(self) -> float:
413  """Volume level of the media player (0..1)."""
414  return self._player_player.volume / 100
415 
417  self,
418  media_content_type: MediaType | str | None = None,
419  media_content_id: str | None = None,
420  ) -> BrowseMedia:
421  """Implement the websocket media browsing helper."""
422  return await media_source.async_browse_media(
423  self.hasshass,
424  media_content_id,
425  content_filter=lambda item: item.media_content_type.startswith("audio/"),
426  )
None async_join_players(self, list[str] group_members)
None async_play_media(self, MediaType|str media_type, str media_id, **Any kwargs)
BrowseMedia async_browse_media(self, MediaType|str|None media_content_type=None, str|None media_content_id=None)
None async_update_ha_state(self, bool force_refresh=False)
Definition: entity.py:942
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
Definition: media_player.py:84
str async_process_play_media_url(HomeAssistant hass, str media_content_id, *bool allow_relative_url=False, bool for_supervisor_network=False)
Definition: browse_media.py:36
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193