1 """Decorator service for the media_player.play_media service."""
3 from collections.abc
import Callable
5 from pathlib
import Path
6 from typing
import Any, cast
8 import voluptuous
as vol
9 from yt_dlp
import YoutubeDL
10 from yt_dlp.utils
import DownloadError, ExtractorError
13 ATTR_MEDIA_CONTENT_ID,
14 ATTR_MEDIA_CONTENT_TYPE,
15 DOMAIN
as MEDIA_PLAYER_DOMAIN,
16 MEDIA_PLAYER_PLAY_MEDIA_SCHEMA,
36 SERVICE_EXTRACT_MEDIA_URL,
39 _LOGGER = logging.getLogger(__name__)
41 CONF_CUSTOMIZE_ENTITIES =
"customize"
42 CONF_DEFAULT_STREAM_QUERY =
"default_query"
44 CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
48 """Set up Media Extractor from a config entry."""
53 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
54 """Set up the media extractor service."""
56 async
def extract_media_url(call: ServiceCall) -> ServiceResponse:
57 """Extract media url."""
59 def extract_info() -> dict[str, Any]:
60 youtube_dl = YoutubeDL(
64 "format": call.data[ATTR_FORMAT_QUERY],
69 youtube_dl.extract_info(
70 call.data[ATTR_URL], download=
False, process=
False
74 result = await hass.async_add_executor_job(extract_info)
75 if "entries" in result:
76 _LOGGER.warning(
"Playlists are not supported, looking for the first video")
77 entries =
list(result[
"entries"])
79 selected_media = entries[0]
83 selected_media = result
84 if "formats" in selected_media:
85 if selected_media[
"extractor"] ==
"youtube":
90 url = cast(str, selected_media[
"url"])
93 def play_media(call: ServiceCall) ->
None:
94 """Get stream URL and send it to the play_media service."""
95 MediaExtractor(hass, config.get(DOMAIN, {}), call.data).extract_and_send()
97 default_format_query = config.get(DOMAIN, {}).
get(
98 CONF_DEFAULT_STREAM_QUERY, DEFAULT_STREAM_QUERY
101 hass.services.async_register(
103 SERVICE_EXTRACT_MEDIA_URL,
107 vol.Required(ATTR_URL): cv.string,
109 ATTR_FORMAT_QUERY, default=default_format_query
113 supports_response=SupportsResponse.ONLY,
116 hass.services.async_register(
120 schema=cv.make_entity_service_schema(MEDIA_PLAYER_PLAY_MEDIA_SCHEMA),
127 """Media extractor download exception."""
130 class MEQueryException(Exception):
131 """Media extractor query exception."""
135 """Class which encapsulates all extraction logic."""
140 component_config: dict[str, Any],
141 call_data: dict[str, Any],
143 """Initialize media extractor."""
149 """Return media content url."""
150 return cast(str, self.
call_datacall_data[ATTR_MEDIA_CONTENT_ID])
153 """Return list of entities."""
157 """Extract exact stream format for each entity_id and play it."""
160 except MEDownloadException:
162 "Could not retrieve data for the URL: %s", self.
get_media_urlget_media_url()
168 for entity_id
in entities:
172 """Return format selector for the media URL."""
174 self.
hasshass.config.config_dir,
"media_extractor",
"cookies.txt"
176 ydl_params = {
"quiet":
True,
"logger": _LOGGER}
177 if cookies_file.exists():
178 ydl_params[
"cookiefile"] =
str(cookies_file)
180 "Media extractor loaded cookies file from: %s",
str(cookies_file)
184 "Media extractor didn't find cookies file at: %s",
str(cookies_file)
186 ydl = YoutubeDL(ydl_params)
189 all_media = ydl.extract_info(self.
get_media_urlget_media_url(), process=
False)
190 except DownloadError
as err:
192 raise MEDownloadException
from err
194 if "entries" in all_media:
195 _LOGGER.warning(
"Playlists are not supported, looking for the first video")
196 entries =
list(all_media[
"entries"])
198 selected_media = entries[0]
200 _LOGGER.error(
"Playlist is empty")
201 raise MEDownloadException
203 selected_media = all_media
205 def stream_selector(query: str) -> str:
206 """Find stream URL that matches query."""
208 ydl.params[
"format"] = query
209 requested_stream = ydl.process_ie_result(selected_media, download=
False)
210 except (ExtractorError, DownloadError)
as err:
211 _LOGGER.error(
"Could not extract stream for the query: %s", query)
212 raise MEQueryException
from err
214 if "formats" in requested_stream:
215 if requested_stream[
"extractor"] ==
"youtube":
218 return cast(str, requested_stream[
"url"])
220 return stream_selector
223 self, stream_selector: Callable[[str], str], entity_id: str |
None
225 """Call Media player play_media service."""
229 stream_url = stream_selector(stream_query)
230 except MEQueryException:
231 _LOGGER.error(
"Wrong query format: %s", stream_query)
233 _LOGGER.debug(
"Selected the following stream: %s", stream_url)
234 data = {k: v
for k, v
in self.
call_datacall_data.items()
if k != ATTR_ENTITY_ID}
235 data[ATTR_MEDIA_CONTENT_ID] = stream_url
238 data[ATTR_ENTITY_ID] = entity_id
240 self.
hasshass.create_task(
241 self.
hasshass.services.async_call(MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA, data)
245 """Get stream format query for entity."""
246 default_stream_query: str = self.
configconfig.
get(
247 CONF_DEFAULT_STREAM_QUERY, DEFAULT_STREAM_QUERY
251 media_content_type = self.
call_datacall_data.
get(ATTR_MEDIA_CONTENT_TYPE)
254 self.
configconfig.
get(CONF_CUSTOMIZE_ENTITIES, {})
256 .
get(media_content_type, default_stream_query)
259 return default_stream_query
263 """Return the best quality stream.
266 https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/common.py#L128.
269 return cast(str, formats[len(formats) - 1][
"url"])
273 """YouTube responses also include files with only video or audio.
275 So we filter on files with both audio and video codec.
281 for stream_format
in formats
282 if stream_format.get(
"acodec",
"none") !=
"none"
283 and stream_format.get(
"vcodec",
"none") !=
"none"
web.Response get(self, web.Request request, str config_key)