1 """Services for the Plex integration."""
6 from plexapi.exceptions
import NotFound
7 import voluptuous
as vol
16 PLEX_UPDATE_PLATFORMS_SIGNAL,
19 SERVICE_REFRESH_LIBRARY,
22 from .errors
import MediaNotFound
23 from .helpers
import get_plex_data
24 from .models
import PlexMediaSearchResult
25 from .server
import PlexServer
27 REFRESH_LIBRARY_SCHEMA = vol.Schema(
28 {vol.Optional(
"server_name"): str, vol.Required(
"library_name"): str}
31 _LOGGER = logging.getLogger(__package__)
35 """Set up services for the Plex component."""
37 async
def async_refresh_library_service(service_call: ServiceCall) ->
None:
38 await hass.async_add_executor_job(refresh_library, hass, service_call)
40 async
def async_scan_clients_service(_: ServiceCall) ->
None:
42 "This service is deprecated in favor of the scan_clients button entity."
43 " Service calls will still work for now but the service will be removed in"
49 hass.services.async_register(
51 SERVICE_REFRESH_LIBRARY,
52 async_refresh_library_service,
53 schema=REFRESH_LIBRARY_SCHEMA,
55 hass.services.async_register(
56 DOMAIN, SERVICE_SCAN_CLIENTS, async_scan_clients_service
61 """Scan a Plex library for new and updated media."""
62 plex_server_name = service_call.data.get(
"server_name")
63 library_name = service_call.data[
"library_name"]
68 library = plex_server.library.section(title=library_name)
71 "Library with name '%s' not found in %s",
73 [x.title
for x
in plex_server.library.sections()],
77 _LOGGER.debug(
"Scanning %s for new and updated media", library_name)
83 plex_server_name: str |
None =
None,
84 plex_server_id: str |
None =
None,
86 """Retrieve a configured Plex server by name."""
87 if DOMAIN
not in hass.data:
94 return servers[plex_server_id]
96 plex_servers = servers.values()
99 (x
for x
in plex_servers
if x.friendly_name == plex_server_name),
None
101 if plex_server
is not None:
103 friendly_names = [x.friendly_name
for x
in plex_servers]
105 f
"Requested Plex server '{plex_server_name}' not found in {friendly_names}"
108 if len(plex_servers) == 1:
109 return next(iter(plex_servers))
111 friendly_names = [x.friendly_name
for x
in plex_servers]
113 "Multiple Plex servers configured, choose with 'plex_server' key:"
122 default_plex_server: PlexServer |
None =
None,
123 supports_playqueues: bool =
True,
124 ) -> PlexMediaSearchResult:
125 """Look up Plex media using media_player.play_media service payloads."""
126 plex_server = default_plex_server
129 if content_id.startswith(PLEX_URI_SCHEME +
"{"):
131 content_id = content_id.removeprefix(PLEX_URI_SCHEME)
132 content = json.loads(content_id)
133 elif content_id.startswith(PLEX_URI_SCHEME):
135 plex_url =
URL(content_id)
139 if len(plex_url.parts) == 2:
140 if plex_url.name ==
"search":
143 content =
int(plex_url.name)
146 content = plex_url.path
147 server_id = plex_url.host
151 if plex_url.host ==
"search":
154 content =
int(plex_url.host)
155 extra_params =
dict(plex_url.query)
157 content = json.loads(content_id)
159 if isinstance(content, dict):
160 if plex_server_name := content.pop(
"plex_server",
None):
166 if isinstance(content, dict):
167 if plex_user := content.pop(
"username",
None):
168 _LOGGER.debug(
"Switching to Plex user: %s", plex_user)
169 plex_server = plex_server.switch_user(plex_user)
171 if content_type ==
"station":
172 if not supports_playqueues:
174 playqueue = plex_server.create_station_playqueue(content)
177 if isinstance(content, int):
178 content = {
"plex_key": content}
179 content_type = DOMAIN
181 content.update(extra_params)
183 if playqueue_id := content.pop(
"playqueue_id",
None):
184 if not supports_playqueues:
187 playqueue = plex_server.get_playqueue(playqueue_id)
188 except NotFound
as err:
190 f
"PlayQueue '{playqueue_id}' could not be found"
194 search_query = content.copy()
195 shuffle = search_query.pop(
"shuffle", 0)
198 for internal_key
in (
"resume",
"offset"):
199 search_query.pop(internal_key,
None)
201 media = plex_server.lookup_media(content_type, **search_query)
203 if supports_playqueues
and (isinstance(media, list)
or shuffle):
204 playqueue = plex_server.create_playqueue(
205 media, includeRelated=0, shuffle=shuffle
PlexData get_plex_data(HomeAssistant hass)
PlexServer get_plex_server(HomeAssistant hass, str|None plex_server_name=None, str|None plex_server_id=None)
None refresh_library(HomeAssistant hass, ServiceCall service_call)
PlexMediaSearchResult process_plex_payload(HomeAssistant hass, str content_type, str content_id, PlexServer|None default_plex_server=None, bool supports_playqueues=True)
None async_setup_services(HomeAssistant hass)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)