Home Assistant Unofficial Reference 2024.12.1
media_player.py
Go to the documentation of this file.
1 """Platform for Control4 Rooms Media Players."""
2 
3 from __future__ import annotations
4 
5 from dataclasses import dataclass
6 from datetime import timedelta
7 import enum
8 import logging
9 from typing import Any
10 
11 from pyControl4.error_handling import C4Exception
12 from pyControl4.room import C4Room
13 
15  MediaPlayerDeviceClass,
16  MediaPlayerEntity,
17  MediaPlayerEntityFeature,
18  MediaPlayerState,
19  MediaType,
20 )
21 from homeassistant.config_entries import ConfigEntry
22 from homeassistant.const import CONF_SCAN_INTERVAL
23 from homeassistant.core import HomeAssistant
24 from homeassistant.helpers.entity_platform import AddEntitiesCallback
25 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
26 
27 from .const import CONF_DIRECTOR, CONF_DIRECTOR_ALL_ITEMS, CONF_UI_CONFIGURATION, DOMAIN
28 from .director_utils import update_variables_for_config_entry
29 from .entity import Control4Entity
30 
31 _LOGGER = logging.getLogger(__name__)
32 
33 CONTROL4_POWER_STATE = "POWER_STATE"
34 CONTROL4_VOLUME_STATE = "CURRENT_VOLUME"
35 CONTROL4_MUTED_STATE = "IS_MUTED"
36 CONTROL4_CURRENT_VIDEO_DEVICE = "CURRENT_VIDEO_DEVICE"
37 CONTROL4_PLAYING = "PLAYING"
38 CONTROL4_PAUSED = "PAUSED"
39 CONTROL4_STOPPED = "STOPPED"
40 CONTROL4_MEDIA_INFO = "CURRENT MEDIA INFO"
41 
42 CONTROL4_PARENT_ID = "parentId"
43 
44 VARIABLES_OF_INTEREST = {
45  CONTROL4_POWER_STATE,
46  CONTROL4_VOLUME_STATE,
47  CONTROL4_MUTED_STATE,
48  CONTROL4_CURRENT_VIDEO_DEVICE,
49  CONTROL4_MEDIA_INFO,
50  CONTROL4_PLAYING,
51  CONTROL4_PAUSED,
52  CONTROL4_STOPPED,
53 }
54 
55 
56 class _SourceType(enum.Enum):
57  AUDIO = 1
58  VIDEO = 2
59 
60 
61 @dataclass
63  """Class for Room Source."""
64 
65  source_type: set[_SourceType]
66  idx: int
67  name: str
68 
69 
70 async def get_rooms(hass: HomeAssistant, entry: ConfigEntry):
71  """Return a list of all Control4 rooms."""
72  director_all_items = hass.data[DOMAIN][entry.entry_id][CONF_DIRECTOR_ALL_ITEMS]
73  return [
74  item
75  for item in director_all_items
76  if "typeName" in item and item["typeName"] == "room"
77  ]
78 
79 
81  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
82 ) -> None:
83  """Set up Control4 rooms from a config entry."""
84  entry_data = hass.data[DOMAIN][entry.entry_id]
85  ui_config = entry_data[CONF_UI_CONFIGURATION]
86 
87  # OS 2 will not have a ui_configuration
88  if not ui_config:
89  _LOGGER.debug("No UI Configuration found for Control4")
90  return
91 
92  all_rooms = await get_rooms(hass, entry)
93  if not all_rooms:
94  return
95 
96  scan_interval = entry_data[CONF_SCAN_INTERVAL]
97  _LOGGER.debug("Scan interval = %s", scan_interval)
98 
99  async def async_update_data() -> dict[int, dict[str, Any]]:
100  """Fetch data from Control4 director."""
101  try:
103  hass, entry, VARIABLES_OF_INTEREST
104  )
105  except C4Exception as err:
106  raise UpdateFailed(f"Error communicating with API: {err}") from err
107 
108  coordinator = DataUpdateCoordinator[dict[int, dict[str, Any]]](
109  hass,
110  _LOGGER,
111  name="room",
112  update_method=async_update_data,
113  update_interval=timedelta(seconds=scan_interval),
114  )
115 
116  # Fetch initial data so we have data when entities subscribe
117  await coordinator.async_refresh()
118 
119  items_by_id = {
120  item["id"]: item
121  for item in hass.data[DOMAIN][entry.entry_id][CONF_DIRECTOR_ALL_ITEMS]
122  }
123  item_to_parent_map = {
124  k: item["parentId"]
125  for k, item in items_by_id.items()
126  if "parentId" in item and k > 1
127  }
128 
129  entity_list = []
130  for room in all_rooms:
131  room_id = room["id"]
132 
133  sources: dict[int, _RoomSource] = {}
134  for exp in ui_config["experiences"]:
135  if room_id == exp["room_id"]:
136  exp_type = exp["type"]
137  if exp_type not in ("listen", "watch"):
138  continue
139 
140  dev_type = (
141  _SourceType.AUDIO if exp_type == "listen" else _SourceType.VIDEO
142  )
143  for source in exp["sources"]["source"]:
144  dev_id = source["id"]
145  name = items_by_id.get(dev_id, {}).get(
146  "name", f"Unknown Device - {dev_id}"
147  )
148  if dev_id in sources:
149  sources[dev_id].source_type.add(dev_type)
150  else:
151  sources[dev_id] = _RoomSource(
152  source_type={dev_type}, idx=dev_id, name=name
153  )
154 
155  try:
156  hidden = room["roomHidden"]
157  entity_list.append(
158  Control4Room(
159  entry_data,
160  coordinator,
161  room["name"],
162  room_id,
163  item_to_parent_map,
164  sources,
165  hidden,
166  )
167  )
168  except KeyError:
169  _LOGGER.exception(
170  "Unknown device properties received from Control4: %s",
171  room,
172  )
173  continue
174 
175  async_add_entities(entity_list, True)
176 
177 
179  """Control4 Room entity."""
180 
181  _attr_has_entity_name = True
182 
183  def __init__(
184  self,
185  entry_data: dict,
186  coordinator: DataUpdateCoordinator[dict[int, dict[str, Any]]],
187  name: str,
188  room_id: int,
189  id_to_parent: dict[int, int],
190  sources: dict[int, _RoomSource],
191  room_hidden: bool,
192  ) -> None:
193  """Initialize Control4 room entity."""
194  super().__init__(
195  entry_data,
196  coordinator,
197  None,
198  room_id,
199  device_name=name,
200  device_manufacturer=None,
201  device_model=None,
202  device_id=room_id,
203  )
204  self._attr_entity_registry_enabled_default_attr_entity_registry_enabled_default = not room_hidden
205  self._id_to_parent_id_to_parent = id_to_parent
206  self._sources_sources = sources
207  self._attr_supported_features_attr_supported_features = (
208  MediaPlayerEntityFeature.PLAY
209  | MediaPlayerEntityFeature.PAUSE
210  | MediaPlayerEntityFeature.STOP
211  | MediaPlayerEntityFeature.VOLUME_MUTE
212  | MediaPlayerEntityFeature.VOLUME_SET
213  | MediaPlayerEntityFeature.VOLUME_STEP
214  | MediaPlayerEntityFeature.TURN_OFF
215  | MediaPlayerEntityFeature.SELECT_SOURCE
216  )
217 
219  """Create a pyControl4 device object.
220 
221  This exists so the director token used is always the latest one, without needing to re-init the entire entity.
222  """
223  return C4Room(self.entry_dataentry_data[CONF_DIRECTOR], self._idx_idx)
224 
225  def _get_device_from_variable(self, var: str) -> int | None:
226  current_device = self.coordinator.data[self._idx_idx][var]
227  if current_device == 0:
228  return None
229 
230  return current_device
231 
232  def _get_current_video_device_id(self) -> int | None:
233  return self._get_device_from_variable_get_device_from_variable(CONTROL4_CURRENT_VIDEO_DEVICE)
234 
235  def _get_current_playing_device_id(self) -> int | None:
236  media_info = self._get_media_info_get_media_info()
237  if media_info:
238  if "medSrcDev" in media_info:
239  return media_info["medSrcDev"]
240  if "deviceid" in media_info:
241  return media_info["deviceid"]
242  return 0
243 
244  def _get_media_info(self) -> dict | None:
245  """Get the Media Info Dictionary if populated."""
246  media_info = self.coordinator.data[self._idx_idx][CONTROL4_MEDIA_INFO]
247  if "mediainfo" in media_info:
248  return media_info["mediainfo"]
249  return None
250 
251  def _get_current_source_state(self) -> str | None:
252  current_source = self._get_current_playing_device_id_get_current_playing_device_id()
253  while current_source:
254  current_data = self.coordinator.data.get(current_source, None)
255  if current_data:
256  if current_data.get(CONTROL4_PLAYING, None):
257  return MediaPlayerState.PLAYING
258  if current_data.get(CONTROL4_PAUSED, None):
259  return MediaPlayerState.PAUSED
260  if current_data.get(CONTROL4_STOPPED, None):
261  return MediaPlayerState.ON
262  current_source = self._id_to_parent_id_to_parent.get(current_source, None)
263  return None
264 
265  @property
266  def device_class(self) -> MediaPlayerDeviceClass | None:
267  """Return the class of this entity."""
268  for avail_source in self._sources_sources.values():
269  if _SourceType.VIDEO in avail_source.source_type:
270  return MediaPlayerDeviceClass.TV
271  return MediaPlayerDeviceClass.SPEAKER
272 
273  @property
274  def state(self):
275  """Return whether this room is on or idle."""
276 
277  if source_state := self._get_current_source_state_get_current_source_state():
278  return source_state
279 
280  if self.coordinator.data[self._idx_idx][CONTROL4_POWER_STATE]:
281  return MediaPlayerState.ON
282 
283  return MediaPlayerState.IDLE
284 
285  @property
286  def source(self):
287  """Get the current source."""
288  current_source = self._get_current_playing_device_id_get_current_playing_device_id()
289  if not current_source or current_source not in self._sources_sources:
290  return None
291  return self._sources_sources[current_source].name
292 
293  @property
294  def media_title(self) -> str | None:
295  """Get the Media Title."""
296  media_info = self._get_media_info_get_media_info()
297  if not media_info:
298  return None
299  if "title" in media_info:
300  return media_info["title"]
301  current_source = self._get_current_playing_device_id_get_current_playing_device_id()
302  if not current_source or current_source not in self._sources_sources:
303  return None
304  return self._sources_sources[current_source].name
305 
306  @property
308  """Get current content type if available."""
309  current_source = self._get_current_playing_device_id_get_current_playing_device_id()
310  if not current_source:
311  return None
312  if current_source == self._get_current_video_device_id_get_current_video_device_id():
313  return MediaType.VIDEO
314  return MediaType.MUSIC
315 
316  async def async_media_play_pause(self):
317  """If possible, toggle the current play/pause state.
318 
319  Not every source supports play/pause.
320  Unfortunately MediaPlayer capabilities are not dynamic,
321  so we must determine if play/pause is supported here
322  """
323  if self._get_current_source_state_get_current_source_state():
324  await super().async_media_play_pause()
325 
326  @property
327  def source_list(self) -> list[str]:
328  """Get the available source."""
329  return [x.name for x in self._sources_sources.values()]
330 
331  @property
332  def volume_level(self):
333  """Get the volume level."""
334  return self.coordinator.data[self._idx_idx][CONTROL4_VOLUME_STATE] / 100
335 
336  @property
337  def is_volume_muted(self):
338  """Check if the volume is muted."""
339  return bool(self.coordinator.data[self._idx_idx][CONTROL4_MUTED_STATE])
340 
341  async def async_select_source(self, source):
342  """Select a new source."""
343  for avail_source in self._sources_sources.values():
344  if avail_source.name == source:
345  audio_only = _SourceType.VIDEO not in avail_source.source_type
346  if audio_only:
347  await self._create_api_object_create_api_object().setAudioSource(avail_source.idx)
348  else:
349  await self._create_api_object_create_api_object().setVideoAndAudioSource(
350  avail_source.idx
351  )
352  break
353 
354  await self.coordinator.async_request_refresh()
355 
356  async def async_turn_off(self):
357  """Turn off the room."""
358  await self._create_api_object_create_api_object().setRoomOff()
359  await self.coordinator.async_request_refresh()
360 
361  async def async_mute_volume(self, mute):
362  """Mute the room."""
363  if mute:
364  await self._create_api_object_create_api_object().setMuteOn()
365  else:
366  await self._create_api_object_create_api_object().setMuteOff()
367  await self.coordinator.async_request_refresh()
368 
369  async def async_set_volume_level(self, volume):
370  """Set room volume, 0-1 scale."""
371  await self._create_api_object_create_api_object().setVolume(int(volume * 100))
372  await self.coordinator.async_request_refresh()
373 
374  async def async_volume_up(self):
375  """Increase the volume by 1."""
376  await self._create_api_object_create_api_object().setIncrementVolume()
377  await self.coordinator.async_request_refresh()
378 
379  async def async_volume_down(self):
380  """Decrease the volume by 1."""
381  await self._create_api_object_create_api_object().setDecrementVolume()
382  await self.coordinator.async_request_refresh()
383 
384  async def async_media_pause(self):
385  """Issue a pause command."""
386  await self._create_api_object_create_api_object().setPause()
387  await self.coordinator.async_request_refresh()
388 
389  async def async_media_play(self):
390  """Issue a play command."""
391  await self._create_api_object_create_api_object().setPlay()
392  await self.coordinator.async_request_refresh()
393 
394  async def async_media_stop(self):
395  """Issue a stop command."""
396  await self._create_api_object_create_api_object().setStop()
397  await self.coordinator.async_request_refresh()
None __init__(self, dict entry_data, DataUpdateCoordinator[dict[int, dict[str, Any]]] coordinator, str name, int room_id, dict[int, int] id_to_parent, dict[int, _RoomSource] sources, bool room_hidden)
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
dict[int, dict[str, Any]] update_variables_for_config_entry(HomeAssistant hass, ConfigEntry entry, set[str] variable_names)
def get_rooms(HomeAssistant hass, ConfigEntry entry)
Definition: media_player.py:70
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
Definition: media_player.py:82