Home Assistant Unofficial Reference 2024.12.1
media_player.py
Go to the documentation of this file.
1 """Support for interface with an LG webOS Smart TV."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Awaitable, Callable, Coroutine
7 from contextlib import suppress
8 from datetime import timedelta
9 from functools import wraps
10 from http import HTTPStatus
11 import logging
12 from typing import Any, Concatenate, cast
13 
14 from aiowebostv import WebOsClient, WebOsTvPairError
15 
16 from homeassistant import util
18  MediaPlayerDeviceClass,
19  MediaPlayerEntity,
20  MediaPlayerEntityFeature,
21  MediaPlayerState,
22  MediaType,
23 )
24 from homeassistant.config_entries import ConfigEntry
25 from homeassistant.const import (
26  ATTR_ENTITY_ID,
27  ATTR_SUPPORTED_FEATURES,
28  ENTITY_MATCH_ALL,
29  ENTITY_MATCH_NONE,
30 )
31 from homeassistant.core import HomeAssistant
32 from homeassistant.exceptions import HomeAssistantError
33 from homeassistant.helpers.aiohttp_client import async_get_clientsession
34 from homeassistant.helpers.device_registry import DeviceInfo
35 from homeassistant.helpers.dispatcher import async_dispatcher_connect
36 from homeassistant.helpers.entity_platform import AddEntitiesCallback
37 from homeassistant.helpers.restore_state import RestoreEntity
38 from homeassistant.helpers.trigger import PluggableAction
39 
40 from . import update_client_key
41 from .const import (
42  ATTR_PAYLOAD,
43  ATTR_SOUND_OUTPUT,
44  CONF_SOURCES,
45  DATA_CONFIG_ENTRY,
46  DOMAIN,
47  LIVE_TV_APP_ID,
48  WEBOSTV_EXCEPTIONS,
49 )
50 from .triggers.turn_on import async_get_turn_on_trigger
51 
52 _LOGGER = logging.getLogger(__name__)
53 
54 SUPPORT_WEBOSTV = (
55  MediaPlayerEntityFeature.TURN_OFF
56  | MediaPlayerEntityFeature.NEXT_TRACK
57  | MediaPlayerEntityFeature.PAUSE
58  | MediaPlayerEntityFeature.PREVIOUS_TRACK
59  | MediaPlayerEntityFeature.SELECT_SOURCE
60  | MediaPlayerEntityFeature.PLAY_MEDIA
61  | MediaPlayerEntityFeature.PLAY
62  | MediaPlayerEntityFeature.STOP
63 )
64 
65 SUPPORT_WEBOSTV_VOLUME = (
66  MediaPlayerEntityFeature.VOLUME_MUTE | MediaPlayerEntityFeature.VOLUME_STEP
67 )
68 
69 MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
70 MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(seconds=1)
71 SCAN_INTERVAL = timedelta(seconds=10)
72 
73 
75  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
76 ) -> None:
77  """Set up the LG webOS Smart TV platform."""
78  client = hass.data[DOMAIN][DATA_CONFIG_ENTRY][entry.entry_id]
80 
81 
82 def cmd[_T: LgWebOSMediaPlayerEntity, **_P](
83  func: Callable[Concatenate[_T, _P], Awaitable[None]],
84 ) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
85  """Catch command exceptions."""
86 
87  @wraps(func)
88  async def cmd_wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
89  """Wrap all command methods."""
90  try:
91  await func(self, *args, **kwargs)
92  except WEBOSTV_EXCEPTIONS as exc:
93  if self.state != MediaPlayerState.OFF:
94  raise HomeAssistantError(
95  f"Error calling {func.__name__} on entity {self.entity_id},"
96  f" state:{self.state}"
97  ) from exc
98  _LOGGER.warning(
99  "Error calling %s on entity %s, state:%s, error: %r",
100  func.__name__,
101  self.entity_id,
102  self.state,
103  exc,
104  )
105 
106  return cmd_wrapper
107 
108 
110  """Representation of a LG webOS Smart TV."""
111 
112  _attr_device_class = MediaPlayerDeviceClass.TV
113  _attr_has_entity_name = True
114  _attr_name = None
115 
116  def __init__(self, entry: ConfigEntry, client: WebOsClient) -> None:
117  """Initialize the webos device."""
118  self._entry_entry = entry
119  self._client_client = client
120  self._attr_assumed_state_attr_assumed_state = True
121  self._device_name_device_name = entry.title
122  self._attr_unique_id_attr_unique_id = entry.unique_id
123  self._sources_sources = entry.options.get(CONF_SOURCES)
124 
125  # Assume that the TV is not paused
126  self._paused_paused = False
127  self._turn_on_turn_on = PluggableAction(self.async_write_ha_stateasync_write_ha_state)
128  self._current_source_current_source = None
129  self._source_list_source_list: dict = {}
130 
131  self._supported_features_supported_features = MediaPlayerEntityFeature(0)
132  self._update_states_update_states()
133 
134  async def async_added_to_hass(self) -> None:
135  """Connect and subscribe to dispatcher signals and state updates."""
136  await super().async_added_to_hass()
137 
138  if (entry := self.registry_entryregistry_entry) and entry.device_id:
139  self.async_on_removeasync_on_remove(
140  self._turn_on_turn_on.async_register(
141  self.hasshass, async_get_turn_on_trigger(entry.device_id)
142  )
143  )
144 
145  self.async_on_removeasync_on_remove(
146  async_dispatcher_connect(self.hasshass, DOMAIN, self.async_signal_handlerasync_signal_handler)
147  )
148 
149  await self._client_client.register_state_update_callback(
150  self.async_handle_state_updateasync_handle_state_update
151  )
152 
153  if (
154  self.statestatestatestatestate == MediaPlayerState.OFF
155  and (state := await self.async_get_last_stateasync_get_last_state()) is not None
156  ):
157  self._supported_features_supported_features = (
158  state.attributes.get(
159  ATTR_SUPPORTED_FEATURES, MediaPlayerEntityFeature(0)
160  )
161  & ~MediaPlayerEntityFeature.TURN_ON
162  )
163 
164  async def async_will_remove_from_hass(self) -> None:
165  """Call disconnect on removal."""
166  self._client_client.unregister_state_update_callback(self.async_handle_state_updateasync_handle_state_update)
167 
168  async def async_signal_handler(self, data: dict[str, Any]) -> None:
169  """Handle domain-specific signal by calling appropriate method."""
170  if (entity_ids := data[ATTR_ENTITY_ID]) == ENTITY_MATCH_NONE:
171  return
172 
173  if entity_ids == ENTITY_MATCH_ALL or self.entity_identity_id in entity_ids:
174  params = {
175  key: value
176  for key, value in data.items()
177  if key not in ["entity_id", "method"]
178  }
179  await getattr(self, data["method"])(**params)
180 
181  async def async_handle_state_update(self, _client: WebOsClient) -> None:
182  """Update state from WebOsClient."""
183  self._update_states_update_states()
184  self.async_write_ha_stateasync_write_ha_state()
185 
186  def _update_states(self) -> None:
187  """Update entity state attributes."""
188  self._update_sources_update_sources()
189 
190  self._attr_state_attr_state = (
191  MediaPlayerState.ON if self._client_client.is_on else MediaPlayerState.OFF
192  )
193  self._attr_is_volume_muted_attr_is_volume_muted = cast(bool, self._client_client.muted)
194 
195  self._attr_volume_level_attr_volume_level = None
196  if self._client_client.volume is not None:
197  self._attr_volume_level_attr_volume_level = cast(float, self._client_client.volume / 100.0)
198 
199  self._attr_source_attr_source = self._current_source_current_source
200  self._attr_source_list_attr_source_list = sorted(self._source_list_source_list)
201 
202  self._attr_media_content_type_attr_media_content_type = None
203  if self._client_client.current_app_id == LIVE_TV_APP_ID:
204  self._attr_media_content_type_attr_media_content_type = MediaType.CHANNEL
205 
206  self._attr_media_title_attr_media_title = None
207  if (self._client_client.current_app_id == LIVE_TV_APP_ID) and (
208  self._client_client.current_channel is not None
209  ):
210  self._attr_media_title_attr_media_title = cast(
211  str, self._client_client.current_channel.get("channelName")
212  )
213 
214  self._attr_media_image_url_attr_media_image_url = None
215  if self._client_client.current_app_id in self._client_client.apps:
216  icon: str = self._client_client.apps[self._client_client.current_app_id]["largeIcon"]
217  if not icon.startswith("http"):
218  icon = self._client_client.apps[self._client_client.current_app_id]["icon"]
219  self._attr_media_image_url_attr_media_image_url = icon
220 
221  if self.statestatestatestatestate != MediaPlayerState.OFF or not self._supported_features_supported_features:
222  supported = SUPPORT_WEBOSTV
223  if self._client_client.sound_output in ("external_arc", "external_speaker"):
224  supported = supported | SUPPORT_WEBOSTV_VOLUME
225  elif self._client_client.sound_output != "lineout":
226  supported = (
227  supported
228  | SUPPORT_WEBOSTV_VOLUME
229  | MediaPlayerEntityFeature.VOLUME_SET
230  )
231 
232  self._supported_features_supported_features = supported
233 
234  self._attr_device_info_attr_device_info = DeviceInfo(
235  identifiers={(DOMAIN, cast(str, self.unique_idunique_id))},
236  manufacturer="LG",
237  name=self._device_name_device_name,
238  )
239 
240  self._attr_assumed_state_attr_assumed_state = True
241  if (
242  self._client_client.is_on
243  and self._client_client.media_state is not None
244  and self._client_client.media_state.get("foregroundAppInfo") is not None
245  ):
246  self._attr_assumed_state_attr_assumed_state = False
247  for entry in self._client_client.media_state.get("foregroundAppInfo"):
248  if entry.get("playState") == "playing":
249  self._attr_state_attr_state = MediaPlayerState.PLAYING
250  elif entry.get("playState") == "paused":
251  self._attr_state_attr_state = MediaPlayerState.PAUSED
252  elif entry.get("playState") == "unloaded":
253  self._attr_state_attr_state = MediaPlayerState.IDLE
254 
255  if self._client_client.system_info is not None or self.statestatestatestatestate != MediaPlayerState.OFF:
256  maj_v = self._client_client.software_info.get("major_ver")
257  min_v = self._client_client.software_info.get("minor_ver")
258  if maj_v and min_v:
259  self._attr_device_info_attr_device_info["sw_version"] = f"{maj_v}.{min_v}"
260 
261  if model := self._client_client.system_info.get("modelName"):
262  self._attr_device_info_attr_device_info["model"] = model
263 
264  self._attr_extra_state_attributes_attr_extra_state_attributes = {}
265  if self._client_client.sound_output is not None or self.statestatestatestatestate != MediaPlayerState.OFF:
266  self._attr_extra_state_attributes_attr_extra_state_attributes = {
267  ATTR_SOUND_OUTPUT: self._client_client.sound_output
268  }
269 
270  def _update_sources(self) -> None:
271  """Update list of sources from current source, apps, inputs and configured list."""
272  source_list = self._source_list_source_list
273  self._source_list_source_list = {}
274  conf_sources = self._sources_sources
275 
276  found_live_tv = False
277  for app in self._client_client.apps.values():
278  if app["id"] == LIVE_TV_APP_ID:
279  found_live_tv = True
280  if app["id"] == self._client_client.current_app_id:
281  self._current_source_current_source = app["title"]
282  self._source_list_source_list[app["title"]] = app
283  elif (
284  not conf_sources
285  or app["id"] in conf_sources
286  or any(word in app["title"] for word in conf_sources)
287  or any(word in app["id"] for word in conf_sources)
288  ):
289  self._source_list_source_list[app["title"]] = app
290 
291  for source in self._client_client.inputs.values():
292  if source["appId"] == LIVE_TV_APP_ID:
293  found_live_tv = True
294  if source["appId"] == self._client_client.current_app_id:
295  self._current_source_current_source = source["label"]
296  self._source_list_source_list[source["label"]] = source
297  elif (
298  not conf_sources
299  or source["label"] in conf_sources
300  or any(source["label"].find(word) != -1 for word in conf_sources)
301  ):
302  self._source_list_source_list[source["label"]] = source
303 
304  # empty list, TV may be off, keep previous list
305  if not self._source_list_source_list and source_list:
306  self._source_list_source_list = source_list
307  # special handling of live tv since this might
308  # not appear in the app or input lists in some cases
309  elif not found_live_tv:
310  app = {"id": LIVE_TV_APP_ID, "title": "Live TV"}
311  if self._client_client.current_app_id == LIVE_TV_APP_ID:
312  self._current_source_current_source = app["title"]
313  self._source_list_source_list["Live TV"] = app
314  elif (
315  not conf_sources
316  or app["id"] in conf_sources
317  or any(word in app["title"] for word in conf_sources)
318  or any(word in app["id"] for word in conf_sources)
319  ):
320  self._source_list_source_list["Live TV"] = app
321 
322  @util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
323  async def async_update(self) -> None:
324  """Connect."""
325  if self._client_client.is_connected():
326  return
327 
328  with suppress(*WEBOSTV_EXCEPTIONS, WebOsTvPairError):
329  try:
330  await self._client_client.connect()
331  except WebOsTvPairError:
332  self._entry_entry.async_start_reauth(self.hasshass)
333  else:
334  update_client_key(self.hasshass, self._entry_entry, self._client_client)
335 
336  @property
337  def supported_features(self) -> MediaPlayerEntityFeature:
338  """Flag media player features that are supported."""
339  if self._turn_on_turn_on:
340  return self._supported_features_supported_features | MediaPlayerEntityFeature.TURN_ON
341 
342  return self._supported_features_supported_features
343 
344  @cmd
345  async def async_turn_off(self) -> None:
346  """Turn off media player."""
347  await self._client_client.power_off()
348 
349  async def async_turn_on(self) -> None:
350  """Turn on media player."""
351  await self._turn_on_turn_on.async_run(self.hasshass, self._context_context)
352 
353  @cmd
354  async def async_volume_up(self) -> None:
355  """Volume up the media player."""
356  await self._client_client.volume_up()
357 
358  @cmd
359  async def async_volume_down(self) -> None:
360  """Volume down media player."""
361  await self._client_client.volume_down()
362 
363  @cmd
364  async def async_set_volume_level(self, volume: float) -> None:
365  """Set volume level, range 0..1."""
366  tv_volume = int(round(volume * 100))
367  await self._client_client.set_volume(tv_volume)
368 
369  @cmd
370  async def async_mute_volume(self, mute: bool) -> None:
371  """Send mute command."""
372  await self._client_client.set_mute(mute)
373 
374  @cmd
375  async def async_select_sound_output(self, sound_output: str) -> None:
376  """Select the sound output."""
377  await self._client_client.change_sound_output(sound_output)
378 
379  @cmd
380  async def async_media_play_pause(self) -> None:
381  """Simulate play pause media player."""
382  if self._paused_paused:
383  await self.async_media_playasync_media_playasync_media_play()
384  else:
385  await self.async_media_pauseasync_media_pauseasync_media_pause()
386 
387  @cmd
388  async def async_select_source(self, source: str) -> None:
389  """Select input source."""
390  if (source_dict := self._source_list_source_list.get(source)) is None:
391  _LOGGER.warning(
392  "Source %s not found for %s", source, self._friendly_name_internal_friendly_name_internal()
393  )
394  return
395  if source_dict.get("title"):
396  await self._client_client.launch_app(source_dict["id"])
397  elif source_dict.get("label"):
398  await self._client_client.set_input(source_dict["id"])
399 
400  @cmd
401  async def async_play_media(
402  self, media_type: MediaType | str, media_id: str, **kwargs: Any
403  ) -> None:
404  """Play a piece of media."""
405  _LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id)
406 
407  if media_type == MediaType.CHANNEL:
408  _LOGGER.debug("Searching channel")
409  partial_match_channel_id = None
410  perfect_match_channel_id = None
411 
412  for channel in self._client_client.channels:
413  if media_id == channel["channelNumber"]:
414  perfect_match_channel_id = channel["channelId"]
415  continue
416 
417  if media_id.lower() == channel["channelName"].lower():
418  perfect_match_channel_id = channel["channelId"]
419  continue
420 
421  if media_id.lower() in channel["channelName"].lower():
422  partial_match_channel_id = channel["channelId"]
423 
424  if perfect_match_channel_id is not None:
425  _LOGGER.debug(
426  "Switching to channel <%s> with perfect match",
427  perfect_match_channel_id,
428  )
429  await self._client_client.set_channel(perfect_match_channel_id)
430  elif partial_match_channel_id is not None:
431  _LOGGER.debug(
432  "Switching to channel <%s> with partial match",
433  partial_match_channel_id,
434  )
435  await self._client_client.set_channel(partial_match_channel_id)
436 
437  @cmd
438  async def async_media_play(self) -> None:
439  """Send play command."""
440  self._paused_paused = False
441  await self._client_client.play()
442 
443  @cmd
444  async def async_media_pause(self) -> None:
445  """Send media pause command to media player."""
446  self._paused_paused = True
447  await self._client_client.pause()
448 
449  @cmd
450  async def async_media_stop(self) -> None:
451  """Send stop command to media player."""
452  await self._client_client.stop()
453 
454  @cmd
455  async def async_media_next_track(self) -> None:
456  """Send next track command."""
457  if self._client_client.current_app_id == LIVE_TV_APP_ID:
458  await self._client_client.channel_up()
459  else:
460  await self._client_client.fast_forward()
461 
462  @cmd
463  async def async_media_previous_track(self) -> None:
464  """Send the previous track command."""
465  if self._client_client.current_app_id == LIVE_TV_APP_ID:
466  await self._client_client.channel_down()
467  else:
468  await self._client_client.rewind()
469 
470  @cmd
471  async def async_button(self, button: str) -> None:
472  """Send a button press."""
473  await self._client_client.button(button)
474 
475  @cmd
476  async def async_command(self, command: str, **kwargs: Any) -> None:
477  """Send a command."""
478  await self._client_client.request(command, payload=kwargs.get(ATTR_PAYLOAD))
479 
480  async def _async_fetch_image(self, url: str) -> tuple[bytes | None, str | None]:
481  """Retrieve an image.
482 
483  webOS uses self-signed certificates, thus we need to use an empty
484  SSLContext to bypass validation errors if url starts with https.
485  """
486  content = None
487 
488  websession = async_get_clientsession(self.hasshass)
489  with suppress(TimeoutError):
490  async with asyncio.timeout(10):
491  response = await websession.get(url, ssl=False)
492  if response.status == HTTPStatus.OK:
493  content = await response.read()
494 
495  if content is None:
496  _LOGGER.warning("Error retrieving proxied image from %s", url)
497 
498  return content, None
None __init__(self, ConfigEntry entry, WebOsClient client)
None async_play_media(self, MediaType|str media_type, str media_id, **Any kwargs)
tuple[bytes|None, str|None] _async_fetch_image(self, str url)
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
str|None _friendly_name_internal(self)
Definition: entity.py:1040
None async_register(HomeAssistant hass, system_health.SystemHealthRegistration register)
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
dict[str, str] async_get_turn_on_trigger(str device_id)
Definition: turn_on.py:39
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
Definition: media_player.py:76
None update_client_key(HomeAssistant hass, ConfigEntry entry, WebOsClient client)
Definition: __init__.py:165
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103