1 """Wrapper for media_source around async_upnp_client's DmsDevice ."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from dataclasses
import dataclass
8 from enum
import StrEnum
10 from typing
import Any, cast
12 from async_upnp_client.aiohttp
import AiohttpSessionRequester
13 from async_upnp_client.client
import UpnpRequester
14 from async_upnp_client.client_factory
import UpnpFactory
15 from async_upnp_client.const
import NotificationSubType
16 from async_upnp_client.exceptions
import UpnpActionError, UpnpConnectionError, UpnpError
17 from async_upnp_client.profiles.dlna
import ContentDirectoryErrorCode, DmsDevice
18 from didl_lite
import didl_lite
19 from propcache
import cached_property
51 """Storage class for domain global data."""
54 requester: UpnpRequester
55 upnp_factory: UpnpFactory
56 devices: dict[str, DmsDeviceSource]
57 sources: dict[str, DmsDeviceSource]
63 """Initialize global data."""
65 session = aiohttp_client.async_get_clientsession(hass, verify_ssl=
False)
66 self.
requesterrequester = AiohttpSessionRequester(session, with_sleep=
True)
72 """Create a DMS device connection from a config entry."""
73 assert config_entry.unique_id
75 self.
devicesdevices[config_entry.unique_id] = device
78 assert device.source_id
not in self.
sourcessources
79 self.
sourcessources[device.source_id] = device
80 await device.async_added_to_hass()
84 """Unload a config entry and disconnect the corresponding DMS device."""
85 assert config_entry.unique_id
86 device = self.
devicesdevices.pop(config_entry.unique_id)
87 del self.
sourcessources[device.source_id]
88 await device.async_will_remove_from_hass()
94 """Obtain this integration's domain data, creating it if needed."""
95 if DOMAIN
in hass.data:
96 return cast(DlnaDmsData, hass.data[DOMAIN])
99 hass.data[DOMAIN] = data
105 """Playable media with DIDL metadata."""
107 didl_metadata: didl_lite.DidlObject
111 """Base for errors raised by DmsDeviceSource.
113 Caught by both media_player (BrowseError) and media_source (Unresolvable),
114 so DmsDeviceSource methods can be used for both browse and resolve
119 class DeviceConnectionError(DlnaDmsDeviceError):
120 """Error occurred with the connection to the server."""
124 """Error when calling a UPnP Action on the device."""
127 def catch_request_errors[_DlnaDmsDeviceMethod: DmsDeviceSource, _R](
128 func: Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _R]],
129 ) -> Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _R]]:
130 """Catch UpnpError errors."""
132 @functools.wraps(func)
133 async
def wrapper(self: _DlnaDmsDeviceMethod, req_param: str) -> _R:
134 """Catch UpnpError errors and check availability before and after request."""
135 if not self.available:
136 LOGGER.warning(
"Device disappeared when trying to call %s", func.__name__)
137 raise DeviceConnectionError(
"DMS is not connected")
140 return await func(self, req_param)
141 except UpnpActionError
as err:
142 LOGGER.debug(
"Server failure", exc_info=err)
143 if err.error_code == ContentDirectoryErrorCode.NO_SUCH_OBJECT:
144 LOGGER.debug(
"No such object: %s", req_param)
145 raise ActionError(f
"No such object: {req_param}")
from err
146 if err.error_code == ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA:
147 LOGGER.debug(
"Invalid query: %s", req_param)
148 raise ActionError(f
"Invalid query: {req_param}")
from err
149 raise DeviceConnectionError(f
"Server failure: {err!r}")
from err
150 except UpnpConnectionError
as err:
151 LOGGER.debug(
"Server disconnected", exc_info=err)
152 await self.device_disconnect()
153 raise DeviceConnectionError(f
"Server disconnected: {err!r}")
from err
154 except UpnpError
as err:
155 LOGGER.debug(
"Server communication failure", exc_info=err)
156 raise DeviceConnectionError(
157 f
"Server communication failure: {err!r}"
164 """DMS Device wrapper, providing media files as a media_source."""
171 _device_lock: asyncio.Lock
172 _device: DmsDevice |
None =
None
175 _ssdp_connect_failed: bool =
False
178 _bootid: int |
None =
None
180 def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) ->
None:
181 """Initialize a DMS Source."""
190 """Handle addition of this source."""
196 except UpnpError
as err:
197 LOGGER.debug(
"Couldn't connect immediately: %r", err)
201 await ssdp.async_register_callback(
211 await ssdp.async_register_callback(
214 {
"_udn": self.
udnudn,
"NTS": NotificationSubType.SSDP_BYEBYE},
219 """Handle removal of this source."""
223 self, info: ssdp.SsdpServiceInfo, change: ssdp.SsdpChange
225 """Handle notification from SSDP of device state change."""
227 "SSDP %s notification of device %s at %s",
234 bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_BOOTID]
235 bootid: int |
None =
int(bootid_str, 10)
236 except (KeyError, ValueError):
239 if change == ssdp.SsdpChange.UPDATE:
245 next_bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_NEXTBOOTID]
246 self.
_bootid_bootid =
int(next_bootid_str, 10)
247 except (KeyError, ValueError):
252 if self.
_bootid_bootid
is not None and self.
_bootid_bootid != bootid:
261 if change == ssdp.SsdpChange.BYEBYE:
270 change == ssdp.SsdpChange.ALIVE
274 assert info.ssdp_location
275 self.
locationlocation = info.ssdp_location
278 except UpnpError
as err:
281 "Failed connecting to recently alive device at %s: %r",
289 """Connect to the device now that it's available."""
290 LOGGER.debug(
"Connecting to device at %s", self.
locationlocation)
295 LOGGER.debug(
"Trying to connect when device already connected")
301 upnp_device = await domain_data.upnp_factory.async_create_device(
306 self.
_device_device = DmsDevice(upnp_device, event_handler=
None)
313 """Destroy connections to the device now that it's not available.
315 Also call when removing this device wrapper from hass to clean up connections.
319 LOGGER.debug(
"Disconnecting from device that's not connected")
322 LOGGER.debug(
"Disconnecting from %s", self.
_device_device.name)
330 """Device is available when we have a connection to it."""
331 return self.
_device_device
is not None and self.
_device_device.profile_device.available
335 """Get the USN (Unique Service Name) for the wrapped UPnP device end-point."""
336 return self.
config_entryconfig_entry.data[CONF_DEVICE_ID]
340 """Get the UDN (Unique Device Name) based on the USN."""
341 return self.
usnusn.partition(
"::")[0]
345 """Return a name for the media server."""
350 """Return a unique ID (slug) for this source for people to use in URLs."""
351 return self.
config_entryconfig_entry.data[CONF_SOURCE_ID]
355 """Return an URL to an icon for the media server."""
361 """Resolve a media item to a playable item."""
362 LOGGER.debug(
"async_resolve_media(%s)", identifier)
365 assert action
is not None, f
"Invalid identifier: {identifier}"
367 if action
is Action.OBJECT:
370 if action
is Action.PATH:
374 assert action
is Action.SEARCH
379 LOGGER.debug(
"async_browse_media(%s)", identifier)
382 if action
is Action.OBJECT:
385 if action
is Action.PATH:
389 if action
is Action.SEARCH:
396 @catch_request_errors
398 """Return a playable media item specified by ObjectID."""
401 item = await self.
_device_device.async_browse_metadata(
402 object_id, metadata_filter=DLNA_RESOLVE_FILTER
408 @catch_request_errors
410 """Return an Object ID resolved from a path string."""
415 object_id = ROOT_OBJECT_ID
416 for node
in path.split(PATH_SEP):
422 f
'@parentID="{_esc_quote(object_id)}" and dc:title="{_esc_quote(node)}"'
425 result = await self.
_device_device.async_search_directory(
427 search_criteria=criteria,
428 metadata_filter=DLNA_PATH_FILTER,
431 except UpnpActionError
as err:
432 LOGGER.debug(
"Error in call to async_search_directory: %r", err)
433 if err.error_code == ContentDirectoryErrorCode.NO_SUCH_CONTAINER:
434 raise Unresolvable(f
"No such container: {object_id}")
from err
437 if result.total_matches > 1:
438 raise Unresolvable(f
"Too many items found for {node} in {path}")
441 object_id = result.result[0].id
445 result = await self.
_device_device.async_browse_direct_children(
446 object_id, metadata_filter=DLNA_PATH_FILTER
449 if result.total_matches == 0
or not result.result:
450 raise Unresolvable(f
"No contents for {node} in {path}")
452 node_lower = node.lower()
453 for child
in result.result:
454 if child.title.lower() == node_lower:
459 raise Unresolvable(f
"Nothing found for {node} in {path}")
462 @catch_request_errors
464 """Return first playable media item found by the query string."""
467 result = await self.
_device_device.async_search_directory(
468 container_id=ROOT_OBJECT_ID,
469 search_criteria=query,
470 metadata_filter=DLNA_RESOLVE_FILTER,
474 if result.total_matches == 0
or not result.result:
475 raise Unresolvable(f
"Nothing found for {query}")
478 item = result.result[0]
480 if not isinstance(item, didl_lite.DidlObject):
481 raise Unresolvable(f
"{item} is not a DidlObject")
485 @catch_request_errors
487 """Return the contents of a DLNA container by ObjectID."""
490 base_object = await self.
_device_device.async_browse_metadata(
491 object_id, metadata_filter=DLNA_BROWSE_FILTER
494 children = await self.
_device_device.async_browse_direct_children(
496 metadata_filter=DLNA_BROWSE_FILTER,
502 @catch_request_errors
504 """Return all media items found by the query string."""
507 result = await self.
_device_device.async_search_directory(
508 container_id=ROOT_OBJECT_ID,
509 search_criteria=query,
510 metadata_filter=DLNA_BROWSE_FILTER,
515 for child
in result.result
516 if isinstance(child, didl_lite.DidlObject)
519 return BrowseMediaSource(
522 media_class=MediaClass.DIRECTORY,
523 media_content_type=
"",
524 title=
"Search results",
531 """Return the first playable resource from a DIDL-Lite object."""
535 LOGGER.debug(
"Object %s has no resources", item.id)
536 raise Unresolvable(
"Object has no resources")
538 for resource
in item.res:
542 url = self.
_device_device.get_absolute_url(resource.uri)
543 LOGGER.debug(
"Resolved to url %s MIME %s", url, mime_type)
546 LOGGER.debug(
"Object %s has no playable resources", item.id)
547 raise Unresolvable(
"Object has no playable resources")
551 item: didl_lite.DidlObject,
552 browsed_children: DmsDevice.BrowseResult |
None =
None,
553 ) -> BrowseMediaSource:
554 """Convert a DIDL-Lite object to a browse media source."""
555 children: list[BrowseMediaSource] |
None =
None
560 for child
in browsed_children.result
561 if isinstance(child, didl_lite.DidlObject)
568 child_count =
int(item.child_count)
569 except (AttributeError, TypeError, ValueError):
572 bool(children)
or child_count > 0
or isinstance(item, didl_lite.Container)
579 title = self.
namename
if item.id == ROOT_OBJECT_ID
else item.title
582 media_content_type = mime_type
or item.upnp_class
584 return BrowseMediaSource(
587 media_class=MEDIA_CLASS_MAP.get(item.upnp_class,
""),
588 media_content_type=media_content_type,
591 can_expand=can_expand,
597 """Return absolute URL of a thumbnail for a DIDL-Lite object.
599 Some objects have the thumbnail in albumArtURI, others in an image
605 if album_art_uri := getattr(item,
"album_art_uri",
None):
606 return self.
_device_device.get_absolute_url(album_art_uri)
608 for resource
in item.res:
609 if not resource.protocol_info
or not resource.uri:
611 if resource.protocol_info.startswith(
"http-get:*:image/"):
612 return self.
_device_device.get_absolute_url(resource.uri)
617 """Make an identifier for BrowseMediaSource."""
618 return f
"{self.source_id}/{action}{object_id}"
622 """Return criteria to be used for sorting results.
624 The device must be connected before reading this property.
628 if self.
_device_device.sort_capabilities == [
"*"]:
629 return DLNA_SORT_CRITERIA
636 for criterion
in DLNA_SORT_CRITERIA
637 if criterion[1:]
in self.
_device_device.sort_capabilities
642 """Actions that can be specified in a DMS media-source identifier."""
644 OBJECT = PATH_OBJECT_ID_FLAG
646 SEARCH = PATH_SEARCH_FLAG
650 """Parse the media identifier component of a media-source URI."""
653 if identifier.startswith(PATH_OBJECT_ID_FLAG):
654 return Action.OBJECT, identifier[1:]
655 if identifier.startswith(PATH_SEP):
656 return Action.PATH, identifier[1:]
657 if identifier.startswith(PATH_SEARCH_FLAG):
658 return Action.SEARCH, identifier[1:]
659 return Action.PATH, identifier
663 """Determine if a resource can be streamed across a network."""
665 if not resource.protocol_info:
667 protocol = resource.protocol_info.split(
":")[0].lower()
668 return protocol.lower()
in STREAMABLE_PROTOCOLS
672 """Return the MIME type of a resource, if specified."""
674 if not resource.protocol_info:
677 protocol, _, content_format, _ = resource.protocol_info.split(
":", 3)
680 if protocol.lower()
in STREAMABLE_PROTOCOLS:
681 return content_format
686 """Escape string contents for DLNA search quoted values.
688 See ContentDirectory:v4, section 4.1.2.
690 return contents.replace(
"\\",
"\\\\").replace(
'"',
'\\"')
None __init__(self, HomeAssistant hass)
bool async_setup_entry(self, ConfigEntry config_entry)
bool async_unload_entry(self, ConfigEntry config_entry)
str _make_identifier(self, Action action, str object_id)
None device_connect(self)
str|None _didl_thumbnail_url(self, didl_lite.DidlObject item)
None async_will_remove_from_hass(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
DidlPlayMedia _didl_to_play_media(self, didl_lite.DidlObject item)
DidlPlayMedia async_resolve_object(self, str object_id)
None device_disconnect(self)
BrowseMediaSource async_browse_object(self, str object_id)
None async_ssdp_callback(self, ssdp.SsdpServiceInfo info, ssdp.SsdpChange change)
BrowseMediaSource _didl_to_media_source(self, didl_lite.DidlObject item, DmsDevice.BrowseResult|None browsed_children=None)
BrowseMediaSource async_browse_media(self, str|None identifier)
None async_added_to_hass(self)
DidlPlayMedia async_resolve_search(self, str query)
str async_resolve_path(self, str path)
DidlPlayMedia async_resolve_media(self, str identifier)
BrowseMediaSource async_browse_search(self, str query)
list[str] _sort_criteria(self)
DlnaDmsData get_domain_data(HomeAssistant hass)
str _esc_quote(str contents)
tuple[Action|None, str] _parse_identifier(str|None identifier)
str|None _resource_mime_type(didl_lite.Resource resource)
bool _resource_is_streaming(didl_lite.Resource resource)