1 """Config flow for Plex."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
6 from copy
import deepcopy
8 from typing
import TYPE_CHECKING, Any
10 from aiohttp
import web_response
11 import plexapi.exceptions
12 from plexapi.gdm
import GDM
13 from plexauth
import PlexAuth
14 import requests.exceptions
15 import voluptuous
as vol
21 SOURCE_INTEGRATION_DISCOVERY,
46 AUTOMATIC_SETUP_STRING,
47 CONF_IGNORE_NEW_SHARED_USERS,
48 CONF_IGNORE_PLEX_WEB_CLIENTS,
51 CONF_SERVER_IDENTIFIER,
64 from .errors
import NoServersFound, ServerNotSpecified
65 from .helpers
import get_plex_server
66 from .server
import PlexServer
68 HEADER_FRONTEND_BASE =
"HA-Frontend-Base"
70 _LOGGER = logging.getLogger(__package__)
75 """Return a set of the configured Plex servers."""
77 entry.data[CONF_SERVER_IDENTIFIER]
78 for entry
in hass.config_entries.async_entries(DOMAIN)
83 """Scan for available Plex servers."""
85 await hass.async_add_executor_job(gdm.scan)
86 for server_data
in gdm.entries:
87 discovery_flow.async_create_flow(
90 context={CONF_SOURCE: SOURCE_INTEGRATION_DISCOVERY},
96 """Handle a Plex config flow."""
100 available_servers: list[tuple[str, str, str]]
106 config_entry: ConfigEntry,
107 ) -> PlexOptionsFlowHandler:
108 """Get the options flow for this handler."""
112 """Initialize the Plex flow."""
121 user_input: dict[str, Any] |
None =
None,
122 errors: dict[str, str] |
None =
None,
123 ) -> ConfigFlowResult:
124 """Handle a flow initialized by the user."""
125 if user_input
is not None:
133 user_input: dict[str, str] |
None =
None,
134 errors: dict[str, str] |
None =
None,
135 ) -> ConfigFlowResult:
136 """Handle an advanced mode flow initialized by the user."""
137 if user_input
is not None:
138 if user_input.get(
"setup_method") == MANUAL_SETUP_STRING:
143 data_schema = vol.Schema(
145 vol.Required(
"setup_method", default=AUTOMATIC_SETUP_STRING): vol.In(
146 [AUTOMATIC_SETUP_STRING, MANUAL_SETUP_STRING]
151 step_id=
"user_advanced", data_schema=data_schema, errors=errors
156 user_input: dict[str, Any] |
None =
None,
157 errors: dict[str, str] |
None =
None,
158 ) -> ConfigFlowResult:
159 """Begin manual configuration."""
160 if user_input
is not None and errors
is None:
161 user_input.pop(CONF_URL,
None)
162 if host := user_input.get(CONF_HOST):
163 port = user_input[CONF_PORT]
164 prefix =
"https" if user_input.get(CONF_SSL)
else "http"
165 user_input[CONF_URL] = f
"{prefix}://{host}:{port}"
166 elif CONF_TOKEN
not in user_input:
168 user_input=user_input, errors={
"base":
"host_or_token"}
172 previous_input = user_input
or {}
174 data_schema = vol.Schema(
178 description={
"suggested_value": previous_input.get(CONF_HOST)},
181 CONF_PORT, default=previous_input.get(CONF_PORT, DEFAULT_PORT)
184 CONF_SSL, default=previous_input.get(CONF_SSL, DEFAULT_SSL)
188 default=previous_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL),
192 description={
"suggested_value": previous_input.get(CONF_TOKEN)},
197 step_id=
"manual_setup", data_schema=data_schema, errors=errors
201 self, server_config: dict[str, Any]
202 ) -> ConfigFlowResult:
203 """Validate a provided configuration."""
205 server_config = {**self.
_reauth_config_reauth_config, **server_config}
210 plex_server =
PlexServer(self.hass, server_config)
212 await self.hass.async_add_executor_job(plex_server.connect)
214 except NoServersFound:
215 _LOGGER.error(
"No servers linked to Plex account")
216 errors[
"base"] =
"no_servers"
217 except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
218 _LOGGER.error(
"Invalid credentials provided, config not created")
219 errors[CONF_TOKEN] =
"faulty_credentials"
220 except requests.exceptions.SSLError
as error:
221 _LOGGER.error(
"SSL certificate error: [%s]", error)
222 errors[
"base"] =
"ssl_error"
223 except (plexapi.exceptions.NotFound, requests.exceptions.ConnectionError):
224 server_identifier = (
225 server_config.get(CONF_URL)
or plex_server.server_choice
or "Unknown"
227 _LOGGER.error(
"Plex server could not be reached: %s", server_identifier)
228 errors[CONF_HOST] =
"not_found"
230 except ServerNotSpecified
as available_servers:
235 _LOGGER.exception(
"Unknown error connecting to Plex server")
241 user_input=server_config, errors=errors
245 server_id = plex_server.machine_identifier
246 url = plex_server.url_in_use
247 token = server_config.get(CONF_TOKEN)
249 entry_config = {CONF_URL: url}
251 entry_config[CONF_CLIENT_ID] = self.
client_idclient_id
253 entry_config[CONF_TOKEN] = token
254 if url.startswith(
"https"):
255 entry_config[CONF_VERIFY_SSL] = server_config.get(
256 CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL
260 CONF_SERVER: plex_server.friendly_name,
261 CONF_SERVER_IDENTIFIER: server_id,
262 PLEX_SERVER_CONFIG: entry_config,
266 if self.context[CONF_SOURCE] == SOURCE_REAUTH:
269 self.hass.config_entries.async_update_entry(entry, data=data)
270 _LOGGER.debug(
"Updated config entry for %s", plex_server.friendly_name)
271 await self.hass.config_entries.async_reload(entry.entry_id)
276 _LOGGER.debug(
"Valid config created for %s", plex_server.friendly_name)
281 self, user_input: dict[str, str] |
None =
None
282 ) -> ConfigFlowResult:
283 """Use selected Plex server."""
285 if user_input
is not None:
286 config[CONF_SERVER_IDENTIFIER] = user_input[CONF_SERVER_IDENTIFIER]
290 available_servers = {
291 server_id: f
"{name} ({owner})" if owner
else name
293 if server_id
not in configured
296 if not available_servers:
298 if len(available_servers) == 1:
299 config[CONF_SERVER_IDENTIFIER] = next(iter(available_servers))
303 step_id=
"select_server",
304 data_schema=vol.Schema(
305 {vol.Required(CONF_SERVER_IDENTIFIER): vol.In(available_servers)}
311 self, discovery_info: dict[str, Any]
312 ) -> ConfigFlowResult:
313 """Handle GDM discovery."""
314 machine_identifier = discovery_info[
"data"][
"Resource-Identifier"]
317 host = f
"{discovery_info['from'][0]}:{discovery_info['data']['Port']}"
318 name = discovery_info[
"data"][
"Name"]
319 self.context[
"title_placeholders"] = {
326 """Begin external auth flow on Plex website."""
327 self.hass.http.register_view(PlexAuthorizationCallbackView)
328 if (req := http.current_request.get())
is None:
329 raise RuntimeError(
"No current request in context")
330 if (hass_url := req.headers.get(HEADER_FRONTEND_BASE))
is None:
331 raise RuntimeError(
"No header in request")
333 headers = {
"Origin": hass_url}
335 "X-Plex-Device-Name": X_PLEX_DEVICE_NAME,
336 "X-Plex-Version": X_PLEX_VERSION,
337 "X-Plex-Product": X_PLEX_PRODUCT,
338 "X-Plex-Device": self.hass.config.location_name,
339 "X-Plex-Platform": X_PLEX_PLATFORM,
340 "X-Plex-Model":
"Plex OAuth",
343 self.
plexauthplexauth = PlexAuth(payload, session, headers)
344 await self.
plexauthplexauth.initiate_auth()
345 forward_url = f
"{hass_url}{AUTH_CALLBACK_PATH}?flow_id={self.flow_id}"
346 auth_url = self.
plexauthplexauth.auth_url(forward_url)
350 self, user_input: None =
None
351 ) -> ConfigFlowResult:
352 """Obtain token after external auth completed."""
358 self.
tokentoken = token
363 """Abort flow when time expires."""
367 self, user_input: None =
None
368 ) -> ConfigFlowResult:
369 """Continue server validation with external token."""
370 server_config = {CONF_TOKEN: self.
tokentoken}
374 self, entry_data: Mapping[str, Any]
375 ) -> ConfigFlowResult:
376 """Handle a reauthorization flow request."""
378 CONF_SERVER_IDENTIFIER: entry_data[CONF_SERVER_IDENTIFIER]
384 """Handle Plex options."""
386 def __init__(self, config_entry: ConfigEntry) ->
None:
387 """Initialize Plex options flow."""
389 self.
server_idserver_id = config_entry.data[CONF_SERVER_IDENTIFIER]
392 """Manage the Plex options."""
396 self, user_input: dict[str, Any] |
None =
None
397 ) -> ConfigFlowResult:
398 """Manage the Plex media_player options."""
401 if user_input
is not None:
402 self.
optionsoptions[MP_DOMAIN][CONF_USE_EPISODE_ART] = user_input[
405 self.
optionsoptions[MP_DOMAIN][CONF_IGNORE_NEW_SHARED_USERS] = user_input[
406 CONF_IGNORE_NEW_SHARED_USERS
408 self.
optionsoptions[MP_DOMAIN][CONF_IGNORE_PLEX_WEB_CLIENTS] = user_input[
409 CONF_IGNORE_PLEX_WEB_CLIENTS
413 user: {
"enabled": bool(user
in user_input[CONF_MONITORED_USERS])}
414 for user
in plex_server.accounts
417 self.
optionsoptions[MP_DOMAIN][CONF_MONITORED_USERS] = account_data
421 available_accounts = {name: name
for name
in plex_server.accounts}
422 available_accounts[plex_server.owner] +=
" [Owner]"
424 default_accounts = plex_server.accounts
425 known_accounts = set(plex_server.option_monitored_users)
429 for user
in plex_server.option_monitored_users
430 if plex_server.option_monitored_users[user][
"enabled"]
432 default_accounts.intersection_update(plex_server.accounts)
433 for user
in plex_server.accounts:
434 if user
not in known_accounts:
435 available_accounts[user] +=
" [New]"
437 if not plex_server.option_ignore_new_shared_users:
438 for new_user
in plex_server.accounts - known_accounts:
439 default_accounts.add(new_user)
442 step_id=
"plex_mp_settings",
443 data_schema=vol.Schema(
446 CONF_USE_EPISODE_ART,
447 default=plex_server.option_use_episode_art,
450 CONF_MONITORED_USERS, default=default_accounts
451 ): cv.multi_select(available_accounts),
453 CONF_IGNORE_NEW_SHARED_USERS,
454 default=plex_server.option_ignore_new_shared_users,
457 CONF_IGNORE_PLEX_WEB_CLIENTS,
458 default=plex_server.option_ignore_plexweb_clients,
466 """Handle callback from external auth."""
468 url = AUTH_CALLBACK_PATH
469 name = AUTH_CALLBACK_NAME
470 requires_auth =
False
472 async
def get(self, request):
473 """Receive authorization confirmation."""
474 hass = request.app[KEY_HASS]
475 await hass.config_entries.flow.async_configure(
476 flow_id=request.query[
"flow_id"], user_input=
None
479 return web_response.Response(
480 headers={
"content-type":
"text/html"},
481 text=
"<script>window.close()</script>Success! This window can be closed",
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
ConfigFlowResult async_step_server_validate(self, dict[str, Any] server_config)
ConfigFlowResult async_step_user_advanced(self, dict[str, str]|None user_input=None, dict[str, str]|None errors=None)
ConfigFlowResult _async_step_plex_website_auth(self)
ConfigFlowResult async_step_select_server(self, dict[str, str]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
PlexOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_timed_out(self, None user_input=None)
ConfigFlowResult async_step_integration_discovery(self, dict[str, Any] discovery_info)
ConfigFlowResult async_step_manual_setup(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
ConfigFlowResult async_step_use_external_token(self, None user_input=None)
ConfigFlowResult async_step_obtain_token(self, None user_input=None)
ConfigFlowResult async_step_init(self, None user_input=None)
None __init__(self, ConfigEntry config_entry)
ConfigFlowResult async_step_plex_mp_settings(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step(self, *str|None step_id=None, str url, Mapping[str, str]|None description_placeholders=None)
bool show_advanced_options(self)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step_done(self, *str next_step_id)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None async_discover(HomeAssistant hass)
set[str] configured_servers(HomeAssistant hass)
PlexServer get_plex_server(HomeAssistant hass, str server_id)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)