Home Assistant Unofficial Reference 2024.12.1
media_player.py
Go to the documentation of this file.
1 """Support for PlayStation 4 consoles."""
2 
3 from contextlib import suppress
4 import logging
5 from typing import Any, cast
6 
7 from pyps4_2ndscreen.errors import NotReady, PSDataIncomplete
8 from pyps4_2ndscreen.media_art import TYPE_APP as PS_TYPE_APP
9 import pyps4_2ndscreen.ps4 as pyps4
10 
12  ATTR_MEDIA_CONTENT_TYPE,
13  ATTR_MEDIA_TITLE,
14  MediaPlayerEntity,
15  MediaPlayerEntityFeature,
16  MediaPlayerState,
17  MediaType,
18 )
19 from homeassistant.config_entries import ConfigEntry
20 from homeassistant.const import (
21  ATTR_LOCKED,
22  CONF_HOST,
23  CONF_NAME,
24  CONF_REGION,
25  CONF_TOKEN,
26 )
27 from homeassistant.core import HomeAssistant, callback
28 from homeassistant.helpers import device_registry as dr, entity_registry as er
29 from homeassistant.helpers.device_registry import DeviceInfo
30 from homeassistant.helpers.entity_platform import AddEntitiesCallback
31 from homeassistant.util.json import JsonObjectType
32 
33 from . import format_unique_id, load_games, save_games
34 from .const import (
35  ATTR_MEDIA_IMAGE_URL,
36  DEFAULT_ALIAS,
37  DOMAIN as PS4_DOMAIN,
38  PS4_DATA,
39  REGIONS as deprecated_regions,
40 )
41 
42 _LOGGER = logging.getLogger(__name__)
43 
44 
45 DEFAULT_RETRIES = 2
46 
47 
49  hass: HomeAssistant,
50  config_entry: ConfigEntry,
51  async_add_entities: AddEntitiesCallback,
52 ) -> None:
53  """Set up PS4 from a config entry."""
54  config = config_entry
55  creds: str = config.data[CONF_TOKEN]
56  device_list = []
57  for device in config.data["devices"]:
58  host: str = device[CONF_HOST]
59  region: str = device[CONF_REGION]
60  name: str = device[CONF_NAME]
61  ps4 = pyps4.Ps4Async(host, creds, device_name=DEFAULT_ALIAS)
62  device_list.append(PS4Device(config, name, host, region, ps4, creds))
63  async_add_entities(device_list, update_before_add=True)
64 
65 
67  """Representation of a PS4."""
68 
69  _attr_supported_features = (
70  MediaPlayerEntityFeature.TURN_OFF
71  | MediaPlayerEntityFeature.TURN_ON
72  | MediaPlayerEntityFeature.PAUSE
73  | MediaPlayerEntityFeature.STOP
74  | MediaPlayerEntityFeature.SELECT_SOURCE
75  )
76  _attr_translation_key = "media_player"
77 
78  def __init__(
79  self,
80  config: ConfigEntry,
81  name: str,
82  host: str,
83  region: str,
84  ps4: pyps4.Ps4Async,
85  creds: str,
86  ) -> None:
87  """Initialize the ps4 device."""
88  self._entry_id_entry_id = config.entry_id
89  self._ps4_ps4 = ps4
90  self._host_host = host
91  self._attr_name_attr_name = name
92  self._region_region = region
93  self._creds_creds = creds
94  self._media_image_media_image: str | None = None
95  self._games_games: JsonObjectType = {}
96  self._retry_retry = 0
97  self._disconnected_disconnected = False
98 
99  def status_callback(self) -> None:
100  """Handle status callback. Parse status."""
101  self._parse_status_parse_status()
102  self.schedule_update_ha_stateschedule_update_ha_state()
103 
104  @callback
105  def subscribe_to_protocol(self) -> None:
106  """Notify protocol to callback with update changes."""
107  self.hasshass.data[PS4_DATA].protocol.add_callback(self._ps4_ps4, self.status_callbackstatus_callback)
108 
109  @callback
110  def unsubscribe_to_protocol(self) -> None:
111  """Notify protocol to remove callback."""
112  self.hasshass.data[PS4_DATA].protocol.remove_callback(
113  self._ps4_ps4, self.status_callbackstatus_callback
114  )
115 
116  def check_region(self) -> None:
117  """Display logger msg if region is deprecated."""
118  # Non-Breaking although data returned may be inaccurate.
119  if self._region_region in deprecated_regions:
120  _LOGGER.warning(
121  """Region: %s has been deprecated.
122  Please remove PS4 integration
123  and Re-configure again to utilize
124  current regions""",
125  self._region_region,
126  )
127 
128  async def async_added_to_hass(self) -> None:
129  """Subscribe PS4 events."""
130  self.hasshass.data[PS4_DATA].devices.append(self)
131  self.check_regioncheck_region()
132 
133  async def async_update(self) -> None:
134  """Retrieve the latest data."""
135  if self._ps4_ps4.ddp_protocol is not None:
136  # Request Status with asyncio transport.
137  self._ps4_ps4.get_status()
138 
139  # Don't attempt to connect if entity is connected or if,
140  # PS4 is in standby or disconnected from LAN or powered off.
141  if (
142  not self._ps4_ps4.connected
143  and not self._ps4_ps4.is_standby
144  and self._ps4_ps4.is_available
145  ):
146  with suppress(NotReady):
147  await self._ps4_ps4.async_connect()
148 
149  # Try to ensure correct status is set on startup for device info.
150  if self._ps4_ps4.ddp_protocol is None:
151  # Use socket.socket.
152  await self.hasshass.async_add_executor_job(self._ps4_ps4.get_status)
153  if self._attr_device_info_attr_device_info is None:
154  # Add entity to registry.
155  await self.async_get_device_infoasync_get_device_info(self._ps4_ps4.status)
156  self._ps4_ps4.ddp_protocol = self.hasshass.data[PS4_DATA].protocol
157  self.subscribe_to_protocolsubscribe_to_protocol()
158 
159  await self.hasshass.async_add_executor_job(self._parse_status_parse_status)
160 
161  def _parse_status(self) -> None:
162  """Parse status."""
163  status: dict[str, Any] | None = self._ps4_ps4.status
164  if status is not None:
165  self._games_games = load_games(self.hasshass, cast(str, self.unique_idunique_id))
166  if self._games_games:
167  self.get_source_listget_source_list()
168 
169  self._retry_retry = 0
170  self._disconnected_disconnected = False
171  if status.get("status") == "Ok":
172  title_id = status.get("running-app-titleid")
173  name = status.get("running-app-name")
174 
175  if title_id and name is not None:
176  self._attr_state_attr_state = MediaPlayerState.PLAYING
177 
178  if self.media_content_idmedia_content_id != title_id:
179  self._attr_media_content_id_attr_media_content_id = title_id
180  if self._use_saved_use_saved():
181  _LOGGER.debug("Using saved data for media: %s", title_id)
182  return
183 
184  self._attr_media_title_attr_media_title = name
185  self._attr_source_attr_source = self._attr_media_title_attr_media_title
186  self._attr_media_content_type_attr_media_content_type = None
187  # Get data from PS Store.
188  self.hasshass.async_create_background_task(
189  self.async_get_title_dataasync_get_title_data(title_id, name),
190  "ps4.media_player-get_title_data",
191  )
192  elif self.statestatestatestatestate != MediaPlayerState.IDLE:
193  self.idleidle()
194  elif self.statestatestatestatestate != MediaPlayerState.STANDBY:
195  self.state_standbystate_standby()
196 
197  elif self._retry_retry > DEFAULT_RETRIES:
198  self.state_unknownstate_unknown()
199  else:
200  self._retry_retry += 1
201 
202  def _use_saved(self) -> bool:
203  """Return True, Set media attrs if data is locked."""
204  if self.media_content_idmedia_content_id in self._games_games:
205  store = cast(JsonObjectType, self._games_games[self.media_content_idmedia_content_id])
206 
207  # If locked get attributes from file.
208  if store.get(ATTR_LOCKED):
209  self._attr_media_title_attr_media_title = cast(str | None, store.get(ATTR_MEDIA_TITLE))
210  self._attr_source_attr_source = self._attr_media_title_attr_media_title
211  self._media_image_media_image = cast(str | None, store.get(ATTR_MEDIA_IMAGE_URL))
212  self._attr_media_content_type_attr_media_content_type = cast(
213  str | None, store.get(ATTR_MEDIA_CONTENT_TYPE)
214  )
215  return True
216  return False
217 
218  def idle(self) -> None:
219  """Set states for state idle."""
220  self.reset_titlereset_title()
221  self._attr_state_attr_state = MediaPlayerState.IDLE
222 
223  def state_standby(self) -> None:
224  """Set states for state standby."""
225  self.reset_titlereset_title()
226  self._attr_state_attr_state = MediaPlayerState.STANDBY
227 
228  def state_unknown(self) -> None:
229  """Set states for state unknown."""
230  self.reset_titlereset_title()
231  self._attr_state_attr_state = None
232  if self._disconnected_disconnected is False:
233  _LOGGER.warning("PS4 could not be reached")
234  self._disconnected_disconnected = True
235  self._retry_retry = 0
236 
237  def reset_title(self) -> None:
238  """Update if there is no title."""
239  self._attr_media_title_attr_media_title = None
240  self._attr_media_content_id_attr_media_content_id = None
241  self._attr_media_content_type_attr_media_content_type = None
242  self._attr_source_attr_source = None
243 
244  async def async_get_title_data(self, title_id: str, name: str) -> None:
245  """Get PS Store Data."""
246 
247  app_name = None
248  art = None
249  media_type = None
250  try:
251  title = await self._ps4_ps4.async_get_ps_store_data(
252  name, title_id, self._region_region
253  )
254 
255  except PSDataIncomplete:
256  title = None
257  except TimeoutError:
258  title = None
259  _LOGGER.error("PS Store Search Timed out")
260 
261  else:
262  if title is not None:
263  app_name = title.name
264  art = title.cover_art
265  # Assume media type is game if not app.
266  if title.game_type != PS_TYPE_APP:
267  media_type = MediaType.GAME
268  else:
269  media_type = MediaType.APP
270  else:
271  _LOGGER.error(
272  "Could not find data in region: %s for PS ID: %s",
273  self._region_region,
274  title_id,
275  )
276 
277  finally:
278  self._attr_media_title_attr_media_title = app_name or name
279  self._attr_source_attr_source = self._attr_media_title_attr_media_title
280  self._media_image_media_image = art or None
281  self._attr_media_content_type_attr_media_content_type = media_type
282 
283  await self.hasshass.async_add_executor_job(self.update_listupdate_list)
284  self.async_write_ha_stateasync_write_ha_state()
285 
286  def update_list(self) -> None:
287  """Update Game List, Correct data if different."""
288  if self.media_content_idmedia_content_id in self._games_games:
289  store = cast(JsonObjectType, self._games_games[self.media_content_idmedia_content_id])
290 
291  if (
292  store.get(ATTR_MEDIA_TITLE) != self.media_titlemedia_title
293  or store.get(ATTR_MEDIA_IMAGE_URL) != self._media_image_media_image
294  ):
295  self._games_games.pop(self.media_content_idmedia_content_id)
296 
297  if self.media_content_idmedia_content_id not in self._games_games:
298  self.add_gamesadd_games(
299  self.media_content_idmedia_content_id,
300  self._attr_media_title_attr_media_title,
301  self._media_image_media_image,
302  self._attr_media_content_type_attr_media_content_type,
303  )
304  self._games_games = load_games(self.hasshass, cast(str, self.unique_idunique_id))
305 
306  self.get_source_listget_source_list()
307 
308  def get_source_list(self) -> None:
309  """Parse data entry and update source list."""
310  games = []
311  for data in self._games_games.values():
312  data = cast(JsonObjectType, data)
313  games.append(cast(str, data[ATTR_MEDIA_TITLE]))
314  self._attr_source_list_attr_source_list = sorted(games)
315 
317  self,
318  title_id: str | None,
319  app_name: str | None,
320  image: str | None,
321  g_type: str | None,
322  is_locked: bool = False,
323  ) -> None:
324  """Add games to list."""
325  games = self._games_games
326  if title_id is not None and title_id not in games:
327  game: JsonObjectType = {
328  title_id: {
329  ATTR_MEDIA_TITLE: app_name,
330  ATTR_MEDIA_IMAGE_URL: image,
331  ATTR_MEDIA_CONTENT_TYPE: g_type,
332  ATTR_LOCKED: is_locked,
333  }
334  }
335  games.update(game)
336  save_games(self.hasshass, games, cast(str, self.unique_idunique_id))
337 
338  async def async_get_device_info(self, status: dict[str, Any] | None) -> None:
339  """Set device info for registry."""
340  # If cannot get status on startup, assume info from registry.
341  if status is None:
342  _LOGGER.debug("Assuming status from registry")
343  e_registry = er.async_get(self.hasshass)
344  d_registry = dr.async_get(self.hasshass)
345 
346  for entry in e_registry.entities.get_entries_for_config_entry_id(
347  self._entry_id_entry_id
348  ):
349  self._attr_unique_id_attr_unique_id = entry.unique_id
350  self.entity_identity_identity_id = entry.entity_id
351  break
352  for device in d_registry.devices.get_devices_for_config_entry_id(
353  self._entry_id_entry_id
354  ):
355  self._attr_device_info_attr_device_info = DeviceInfo(
356  identifiers=device.identifiers,
357  manufacturer=device.manufacturer,
358  model=device.model,
359  name=device.name,
360  sw_version=device.sw_version,
361  )
362  break
363 
364  else:
365  _sw_version = status["system-version"]
366  _sw_version = _sw_version[1:4]
367  sw_version = f"{_sw_version[0]}.{_sw_version[1:]}"
368  self._attr_device_info_attr_device_info = DeviceInfo(
369  identifiers={(PS4_DOMAIN, status["host-id"])},
370  manufacturer="Sony Interactive Entertainment Inc.",
371  model="PlayStation 4",
372  name=status["host-name"],
373  sw_version=sw_version,
374  )
375 
376  self._attr_unique_id_attr_unique_id = format_unique_id(self._creds_creds, status["host-id"])
377 
378  async def async_will_remove_from_hass(self) -> None:
379  """Remove Entity from Home Assistant."""
380  # Close TCP Transport.
381  if self._ps4_ps4.connected:
382  await self._ps4_ps4.close()
383  self.unsubscribe_to_protocolunsubscribe_to_protocol()
384  self.hasshass.data[PS4_DATA].devices.remove(self)
385 
386  @property
387  def entity_picture(self) -> str | None:
388  """Return picture."""
389  if (
390  self.statestatestatestatestate == MediaPlayerState.PLAYING
391  and self.media_content_idmedia_content_id is not None
392  and (image_hash := self.media_image_hashmedia_image_hash) is not None
393  ):
394  return (
395  f"/api/media_player_proxy/{self.entity_id}?"
396  f"token={self.access_token}&cache={image_hash}"
397  )
398  return None
399 
400  @property
401  def media_image_url(self) -> str | None:
402  """Image url of current playing media."""
403  if self.media_content_idmedia_content_id is None:
404  return None
405  return self._media_image_media_image
406 
407  async def async_turn_off(self) -> None:
408  """Turn off media player."""
409  await self._ps4_ps4.standby()
410 
411  async def async_turn_on(self) -> None:
412  """Turn on the media player."""
413  self._ps4_ps4.wakeup()
414 
415  async def async_toggle(self) -> None:
416  """Toggle media player."""
417  await self._ps4_ps4.toggle()
418 
419  async def async_media_pause(self) -> None:
420  """Send keypress ps to return to menu."""
421  await self.async_send_remote_controlasync_send_remote_control("ps")
422 
423  async def async_media_stop(self) -> None:
424  """Send keypress ps to return to menu."""
425  await self.async_send_remote_controlasync_send_remote_control("ps")
426 
427  async def async_select_source(self, source: str) -> None:
428  """Select input source."""
429  for title_id, data in self._games_games.items():
430  data = cast(JsonObjectType, data)
431  game = cast(str, data[ATTR_MEDIA_TITLE])
432  if (
433  source.lower().encode(encoding="utf-8")
434  == game.lower().encode(encoding="utf-8")
435  or source == title_id
436  ):
437  _LOGGER.debug(
438  "Starting PS4 game %s (%s) using source %s", game, title_id, source
439  )
440 
441  await self._ps4_ps4.start_title(title_id, self.media_content_idmedia_content_id)
442  return
443 
444  _LOGGER.warning("Could not start title. '%s' is not in source list", source)
445  return
446 
447  async def async_send_command(self, command: str) -> None:
448  """Send Button Command."""
449  await self.async_send_remote_controlasync_send_remote_control(command)
450 
451  async def async_send_remote_control(self, command: str) -> None:
452  """Send RC command."""
453  await self._ps4_ps4.remote_control(command)
None __init__(self, ConfigEntry config, str name, str host, str region, pyps4.Ps4Async ps4, str creds)
Definition: media_player.py:86
None async_get_title_data(self, str title_id, str name)
None add_games(self, str|None title_id, str|None app_name, str|None image, str|None g_type, bool is_locked=False)
None async_get_device_info(self, dict[str, Any]|None status)
None schedule_update_ha_state(self, bool force_refresh=False)
Definition: entity.py:1244
def get_status(hass, host, port)
Definition: panel.py:387
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
Definition: media_player.py:52
JsonObjectType load_games(HomeAssistant hass, str unique_id)
Definition: __init__.py:172
def format_unique_id(creds, mac_address)
Definition: __init__.py:166
def save_games(HomeAssistant hass, dict games, str unique_id)
Definition: __init__.py:187