1 """The media_source integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from typing
import Any, Protocol
8 import voluptuous
as vol
12 ATTR_MEDIA_CONTENT_ID,
13 CONTENT_AUTH_EXPIRY_TIME,
16 async_process_play_media_url,
23 async_process_integration_platforms,
28 from .
import local_source
36 from .error
import MediaSourceError, Unresolvable
37 from .models
import BrowseMediaSource, MediaSource, MediaSourceItem, PlayMedia
42 "generate_media_source_id",
44 "async_resolve_media",
56 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
60 """Define the format of media_source platforms."""
63 """Set up media source."""
67 """Test if identifier is a media source."""
68 return URI_SCHEME_REGEX.match(media_content_id)
is not None
72 """Generate a media source ID."""
73 uri = f
"{URI_SCHEME}{domain or ''}"
75 uri += f
"/{identifier}"
79 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
80 """Set up the media_source component."""
81 hass.data[DOMAIN] = {}
82 websocket_api.async_register_command(hass, websocket_browse_media)
83 websocket_api.async_register_command(hass, websocket_resolve_media)
84 frontend.async_register_built_in_panel(
85 hass,
"media-browser",
"media_browser",
"hass:play-box-multiple"
87 local_source.async_setup(hass)
89 hass, DOMAIN, _process_media_source_platform
97 platform: MediaSourceProtocol,
99 """Process a media source platform."""
100 hass.data[DOMAIN][domain] = await platform.async_get_media_source(hass)
105 hass: HomeAssistant, media_content_id: str |
None, target_media_player: str |
None
106 ) -> MediaSourceItem:
107 """Return media item."""
109 item = MediaSourceItem.from_uri(hass, media_content_id, target_media_player)
112 domain =
None if len(hass.data[DOMAIN]) > 1
else DOMAIN
115 if item.domain
is not None and item.domain
not in hass.data[DOMAIN]:
116 raise ValueError(
"Unknown media source")
124 media_content_id: str |
None,
126 content_filter: Callable[[BrowseMedia], bool] |
None =
None,
127 ) -> BrowseMediaSource:
128 """Return media player browse media results."""
129 if DOMAIN
not in hass.data:
133 item = await
_get_media_item(hass, media_content_id,
None).async_browse()
134 except ValueError
as err:
137 if content_filter
is None or item.children
is None:
140 old_count = len(item.children)
142 child
for child
in item.children
if child.can_expand
or content_filter(child)
144 item.not_shown += old_count - len(item.children)
151 media_content_id: str,
152 target_media_player: str |
None | UndefinedType = UNDEFINED,
154 """Get info to play media."""
155 if DOMAIN
not in hass.data:
158 if target_media_player
is UNDEFINED:
160 "calls media_source.async_resolve_media without passing an entity_id",
161 exclude_integrations={DOMAIN},
163 target_media_player =
None
167 except ValueError
as err:
170 return await item.async_resolve()
173 @websocket_api.websocket_command(
{
vol.Required("type"):
"media_source/browse_media",
174 vol.Optional(ATTR_MEDIA_CONTENT_ID, default=
""): str,
177 @websocket_api.async_response
179 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
181 """Browse available media."""
184 connection.send_result(
188 except BrowseError
as err:
189 connection.send_error(msg[
"id"],
"browse_media_failed",
str(err))
192 @websocket_api.websocket_command(
{
vol.Required("type"):
"media_source/resolve_media",
193 vol.Required(ATTR_MEDIA_CONTENT_ID): str,
194 vol.Optional(
"expires", default=CONTENT_AUTH_EXPIRY_TIME): int,
197 @websocket_api.async_response
199 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
204 except Unresolvable
as err:
205 connection.send_error(msg[
"id"],
"resolve_media_failed",
str(err))
208 connection.send_result(
212 hass, media.url, allow_relative_url=
True
214 "mime_type": media.mime_type,
217
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)