Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Decorator service for the media_player.play_media service."""
2 
3 from collections.abc import Callable
4 import logging
5 from pathlib import Path
6 from typing import Any, cast
7 
8 import voluptuous as vol
9 from yt_dlp import YoutubeDL
10 from yt_dlp.utils import DownloadError, ExtractorError
11 
13  ATTR_MEDIA_CONTENT_ID,
14  ATTR_MEDIA_CONTENT_TYPE,
15  DOMAIN as MEDIA_PLAYER_DOMAIN,
16  MEDIA_PLAYER_PLAY_MEDIA_SCHEMA,
17  SERVICE_PLAY_MEDIA,
18 )
19 from homeassistant.config_entries import ConfigEntry
20 from homeassistant.const import ATTR_ENTITY_ID
21 from homeassistant.core import (
22  HomeAssistant,
23  ServiceCall,
24  ServiceResponse,
25  SupportsResponse,
26 )
27 from homeassistant.exceptions import HomeAssistantError
28 from homeassistant.helpers import config_validation as cv
29 from homeassistant.helpers.typing import ConfigType
30 
31 from .const import (
32  ATTR_FORMAT_QUERY,
33  ATTR_URL,
34  DEFAULT_STREAM_QUERY,
35  DOMAIN,
36  SERVICE_EXTRACT_MEDIA_URL,
37 )
38 
39 _LOGGER = logging.getLogger(__name__)
40 
41 CONF_CUSTOMIZE_ENTITIES = "customize"
42 CONF_DEFAULT_STREAM_QUERY = "default_query"
43 
44 CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
45 
46 
47 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
48  """Set up Media Extractor from a config entry."""
49 
50  return True
51 
52 
53 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
54  """Set up the media extractor service."""
55 
56  async def extract_media_url(call: ServiceCall) -> ServiceResponse:
57  """Extract media url."""
58 
59  def extract_info() -> dict[str, Any]:
60  youtube_dl = YoutubeDL(
61  {
62  "quiet": True,
63  "logger": _LOGGER,
64  "format": call.data[ATTR_FORMAT_QUERY],
65  }
66  )
67  return cast(
68  dict[str, Any],
69  youtube_dl.extract_info(
70  call.data[ATTR_URL], download=False, process=False
71  ),
72  )
73 
74  result = await hass.async_add_executor_job(extract_info)
75  if "entries" in result:
76  _LOGGER.warning("Playlists are not supported, looking for the first video")
77  entries = list(result["entries"])
78  if entries:
79  selected_media = entries[0]
80  else:
81  raise HomeAssistantError("Playlist is empty")
82  else:
83  selected_media = result
84  if "formats" in selected_media:
85  if selected_media["extractor"] == "youtube":
86  url = get_best_stream_youtube(selected_media["formats"])
87  else:
88  url = get_best_stream(selected_media["formats"])
89  else:
90  url = cast(str, selected_media["url"])
91  return {"url": url}
92 
93  def play_media(call: ServiceCall) -> None:
94  """Get stream URL and send it to the play_media service."""
95  MediaExtractor(hass, config.get(DOMAIN, {}), call.data).extract_and_send()
96 
97  default_format_query = config.get(DOMAIN, {}).get(
98  CONF_DEFAULT_STREAM_QUERY, DEFAULT_STREAM_QUERY
99  )
100 
101  hass.services.async_register(
102  DOMAIN,
103  SERVICE_EXTRACT_MEDIA_URL,
104  extract_media_url,
105  schema=vol.Schema(
106  {
107  vol.Required(ATTR_URL): cv.string,
108  vol.Optional(
109  ATTR_FORMAT_QUERY, default=default_format_query
110  ): cv.string,
111  }
112  ),
113  supports_response=SupportsResponse.ONLY,
114  )
115 
116  hass.services.async_register(
117  DOMAIN,
118  SERVICE_PLAY_MEDIA,
119  play_media,
120  schema=cv.make_entity_service_schema(MEDIA_PLAYER_PLAY_MEDIA_SCHEMA),
121  )
122 
123  return True
124 
125 
126 class MEDownloadException(Exception):
127  """Media extractor download exception."""
128 
129 
130 class MEQueryException(Exception):
131  """Media extractor query exception."""
132 
133 
135  """Class which encapsulates all extraction logic."""
136 
137  def __init__(
138  self,
139  hass: HomeAssistant,
140  component_config: dict[str, Any],
141  call_data: dict[str, Any],
142  ) -> None:
143  """Initialize media extractor."""
144  self.hasshass = hass
145  self.configconfig = component_config
146  self.call_datacall_data = call_data
147 
148  def get_media_url(self) -> str:
149  """Return media content url."""
150  return cast(str, self.call_datacall_data[ATTR_MEDIA_CONTENT_ID])
151 
152  def get_entities(self) -> list[str]:
153  """Return list of entities."""
154  return self.call_datacall_data.get(ATTR_ENTITY_ID, []) # type: ignore[no-any-return]
155 
156  def extract_and_send(self) -> None:
157  """Extract exact stream format for each entity_id and play it."""
158  try:
159  stream_selector = self.get_stream_selectorget_stream_selector()
160  except MEDownloadException:
161  _LOGGER.error(
162  "Could not retrieve data for the URL: %s", self.get_media_urlget_media_url()
163  )
164  else:
165  if not (entities := self.get_entitiesget_entities()):
166  self.call_media_player_servicecall_media_player_service(stream_selector, None)
167 
168  for entity_id in entities:
169  self.call_media_player_servicecall_media_player_service(stream_selector, entity_id)
170 
171  def get_stream_selector(self) -> Callable[[str], str]:
172  """Return format selector for the media URL."""
173  cookies_file = Path(
174  self.hasshass.config.config_dir, "media_extractor", "cookies.txt"
175  )
176  ydl_params = {"quiet": True, "logger": _LOGGER}
177  if cookies_file.exists():
178  ydl_params["cookiefile"] = str(cookies_file)
179  _LOGGER.debug(
180  "Media extractor loaded cookies file from: %s", str(cookies_file)
181  )
182  else:
183  _LOGGER.debug(
184  "Media extractor didn't find cookies file at: %s", str(cookies_file)
185  )
186  ydl = YoutubeDL(ydl_params)
187 
188  try:
189  all_media = ydl.extract_info(self.get_media_urlget_media_url(), process=False)
190  except DownloadError as err:
191  # This exception will be logged by youtube-dl itself
192  raise MEDownloadException from err
193 
194  if "entries" in all_media:
195  _LOGGER.warning("Playlists are not supported, looking for the first video")
196  entries = list(all_media["entries"])
197  if entries:
198  selected_media = entries[0]
199  else:
200  _LOGGER.error("Playlist is empty")
201  raise MEDownloadException
202  else:
203  selected_media = all_media
204 
205  def stream_selector(query: str) -> str:
206  """Find stream URL that matches query."""
207  try:
208  ydl.params["format"] = query
209  requested_stream = ydl.process_ie_result(selected_media, download=False)
210  except (ExtractorError, DownloadError) as err:
211  _LOGGER.error("Could not extract stream for the query: %s", query)
212  raise MEQueryException from err
213 
214  if "formats" in requested_stream:
215  if requested_stream["extractor"] == "youtube":
216  return get_best_stream_youtube(requested_stream["formats"])
217  return get_best_stream(requested_stream["formats"])
218  return cast(str, requested_stream["url"])
219 
220  return stream_selector
221 
223  self, stream_selector: Callable[[str], str], entity_id: str | None
224  ) -> None:
225  """Call Media player play_media service."""
226  stream_query = self.get_stream_query_for_entityget_stream_query_for_entity(entity_id)
227 
228  try:
229  stream_url = stream_selector(stream_query)
230  except MEQueryException:
231  _LOGGER.error("Wrong query format: %s", stream_query)
232  return
233  _LOGGER.debug("Selected the following stream: %s", stream_url)
234  data = {k: v for k, v in self.call_datacall_data.items() if k != ATTR_ENTITY_ID}
235  data[ATTR_MEDIA_CONTENT_ID] = stream_url
236 
237  if entity_id:
238  data[ATTR_ENTITY_ID] = entity_id
239 
240  self.hasshass.create_task(
241  self.hasshass.services.async_call(MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA, data)
242  )
243 
244  def get_stream_query_for_entity(self, entity_id: str | None) -> str:
245  """Get stream format query for entity."""
246  default_stream_query: str = self.configconfig.get(
247  CONF_DEFAULT_STREAM_QUERY, DEFAULT_STREAM_QUERY
248  )
249 
250  if entity_id:
251  media_content_type = self.call_datacall_data.get(ATTR_MEDIA_CONTENT_TYPE)
252 
253  return str(
254  self.configconfig.get(CONF_CUSTOMIZE_ENTITIES, {})
255  .get(entity_id, {})
256  .get(media_content_type, default_stream_query)
257  )
258 
259  return default_stream_query
260 
261 
262 def get_best_stream(formats: list[dict[str, Any]]) -> str:
263  """Return the best quality stream.
264 
265  As per
266  https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/common.py#L128.
267  """
268 
269  return cast(str, formats[len(formats) - 1]["url"])
270 
271 
272 def get_best_stream_youtube(formats: list[dict[str, Any]]) -> str:
273  """YouTube responses also include files with only video or audio.
274 
275  So we filter on files with both audio and video codec.
276  """
277 
278  return get_best_stream(
279  [
280  stream_format
281  for stream_format in formats
282  if stream_format.get("acodec", "none") != "none"
283  and stream_format.get("vcodec", "none") != "none"
284  ]
285  )
str get_stream_query_for_entity(self, str|None entity_id)
Definition: __init__.py:244
None call_media_player_service(self, Callable[[str], str] stream_selector, str|None entity_id)
Definition: __init__.py:224
None __init__(self, HomeAssistant hass, dict[str, Any] component_config, dict[str, Any] call_data)
Definition: __init__.py:142
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
str get_best_stream(list[dict[str, Any]] formats)
Definition: __init__.py:262
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:53
str get_best_stream_youtube(list[dict[str, Any]] formats)
Definition: __init__.py:272
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:47