Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Plex."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 from copy import deepcopy
7 import logging
8 from typing import TYPE_CHECKING, Any
9 
10 from aiohttp import web_response
11 import plexapi.exceptions
12 from plexapi.gdm import GDM
13 from plexauth import PlexAuth
14 import requests.exceptions
15 import voluptuous as vol
16 
17 from homeassistant.components import http
18 from homeassistant.components.http import KEY_HASS, HomeAssistantView
19 from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
20 from homeassistant.config_entries import (
21  SOURCE_INTEGRATION_DISCOVERY,
22  SOURCE_REAUTH,
23  ConfigEntry,
24  ConfigFlow,
25  ConfigFlowResult,
26  OptionsFlow,
27 )
28 from homeassistant.const import (
29  CONF_CLIENT_ID,
30  CONF_HOST,
31  CONF_PORT,
32  CONF_SOURCE,
33  CONF_SSL,
34  CONF_TOKEN,
35  CONF_URL,
36  CONF_VERIFY_SSL,
37 )
38 from homeassistant.core import HomeAssistant, callback
39 from homeassistant.helpers import discovery_flow
40 from homeassistant.helpers.aiohttp_client import async_get_clientsession
42 
43 from .const import (
44  AUTH_CALLBACK_NAME,
45  AUTH_CALLBACK_PATH,
46  AUTOMATIC_SETUP_STRING,
47  CONF_IGNORE_NEW_SHARED_USERS,
48  CONF_IGNORE_PLEX_WEB_CLIENTS,
49  CONF_MONITORED_USERS,
50  CONF_SERVER,
51  CONF_SERVER_IDENTIFIER,
52  CONF_USE_EPISODE_ART,
53  DEFAULT_PORT,
54  DEFAULT_SSL,
55  DEFAULT_VERIFY_SSL,
56  DOMAIN,
57  MANUAL_SETUP_STRING,
58  PLEX_SERVER_CONFIG,
59  X_PLEX_DEVICE_NAME,
60  X_PLEX_PLATFORM,
61  X_PLEX_PRODUCT,
62  X_PLEX_VERSION,
63 )
64 from .errors import NoServersFound, ServerNotSpecified
65 from .helpers import get_plex_server
66 from .server import PlexServer
67 
68 HEADER_FRONTEND_BASE = "HA-Frontend-Base"
69 
70 _LOGGER = logging.getLogger(__package__)
71 
72 
73 @callback
74 def configured_servers(hass: HomeAssistant) -> set[str]:
75  """Return a set of the configured Plex servers."""
76  return {
77  entry.data[CONF_SERVER_IDENTIFIER]
78  for entry in hass.config_entries.async_entries(DOMAIN)
79  }
80 
81 
82 async def async_discover(hass: HomeAssistant) -> None:
83  """Scan for available Plex servers."""
84  gdm = GDM()
85  await hass.async_add_executor_job(gdm.scan)
86  for server_data in gdm.entries:
87  discovery_flow.async_create_flow(
88  hass,
89  DOMAIN,
90  context={CONF_SOURCE: SOURCE_INTEGRATION_DISCOVERY},
91  data=server_data,
92  )
93 
94 
95 class PlexFlowHandler(ConfigFlow, domain=DOMAIN):
96  """Handle a Plex config flow."""
97 
98  VERSION = 1
99 
100  available_servers: list[tuple[str, str, str]]
101  plexauth: PlexAuth
102 
103  @staticmethod
104  @callback
106  config_entry: ConfigEntry,
107  ) -> PlexOptionsFlowHandler:
108  """Get the options flow for this handler."""
109  return PlexOptionsFlowHandler(config_entry)
110 
111  def __init__(self) -> None:
112  """Initialize the Plex flow."""
113  self.current_logincurrent_login: dict[str, Any] = {}
114  self.tokentoken = None
115  self.client_idclient_id = None
116  self._manual_manual = False
117  self._reauth_config_reauth_config: dict[str, Any] | None = None
118 
119  async def async_step_user(
120  self,
121  user_input: dict[str, Any] | None = None,
122  errors: dict[str, str] | None = None,
123  ) -> ConfigFlowResult:
124  """Handle a flow initialized by the user."""
125  if user_input is not None:
126  return await self._async_step_plex_website_auth_async_step_plex_website_auth()
127  if self.show_advanced_optionsshow_advanced_options:
128  return await self.async_step_user_advancedasync_step_user_advanced(errors=errors)
129  return self.async_show_formasync_show_formasync_show_form(step_id="user", errors=errors)
130 
132  self,
133  user_input: dict[str, str] | None = None,
134  errors: dict[str, str] | None = None,
135  ) -> ConfigFlowResult:
136  """Handle an advanced mode flow initialized by the user."""
137  if user_input is not None:
138  if user_input.get("setup_method") == MANUAL_SETUP_STRING:
139  self._manual_manual = True
140  return await self.async_step_manual_setupasync_step_manual_setup()
141  return await self._async_step_plex_website_auth_async_step_plex_website_auth()
142 
143  data_schema = vol.Schema(
144  {
145  vol.Required("setup_method", default=AUTOMATIC_SETUP_STRING): vol.In(
146  [AUTOMATIC_SETUP_STRING, MANUAL_SETUP_STRING]
147  )
148  }
149  )
150  return self.async_show_formasync_show_formasync_show_form(
151  step_id="user_advanced", data_schema=data_schema, errors=errors
152  )
153 
155  self,
156  user_input: dict[str, Any] | None = None,
157  errors: dict[str, str] | None = None,
158  ) -> ConfigFlowResult:
159  """Begin manual configuration."""
160  if user_input is not None and errors is None:
161  user_input.pop(CONF_URL, None)
162  if host := user_input.get(CONF_HOST):
163  port = user_input[CONF_PORT]
164  prefix = "https" if user_input.get(CONF_SSL) else "http"
165  user_input[CONF_URL] = f"{prefix}://{host}:{port}"
166  elif CONF_TOKEN not in user_input:
167  return await self.async_step_manual_setupasync_step_manual_setup(
168  user_input=user_input, errors={"base": "host_or_token"}
169  )
170  return await self.async_step_server_validateasync_step_server_validate(user_input)
171 
172  previous_input = user_input or {}
173 
174  data_schema = vol.Schema(
175  {
176  vol.Optional(
177  CONF_HOST,
178  description={"suggested_value": previous_input.get(CONF_HOST)},
179  ): str,
180  vol.Required(
181  CONF_PORT, default=previous_input.get(CONF_PORT, DEFAULT_PORT)
182  ): int,
183  vol.Required(
184  CONF_SSL, default=previous_input.get(CONF_SSL, DEFAULT_SSL)
185  ): bool,
186  vol.Required(
187  CONF_VERIFY_SSL,
188  default=previous_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL),
189  ): bool,
190  vol.Optional(
191  CONF_TOKEN,
192  description={"suggested_value": previous_input.get(CONF_TOKEN)},
193  ): str,
194  }
195  )
196  return self.async_show_formasync_show_formasync_show_form(
197  step_id="manual_setup", data_schema=data_schema, errors=errors
198  )
199 
201  self, server_config: dict[str, Any]
202  ) -> ConfigFlowResult:
203  """Validate a provided configuration."""
204  if self._reauth_config_reauth_config:
205  server_config = {**self._reauth_config_reauth_config, **server_config}
206 
207  errors = {}
208  self.current_logincurrent_login = server_config
209 
210  plex_server = PlexServer(self.hass, server_config)
211  try:
212  await self.hass.async_add_executor_job(plex_server.connect)
213 
214  except NoServersFound:
215  _LOGGER.error("No servers linked to Plex account")
216  errors["base"] = "no_servers"
217  except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
218  _LOGGER.error("Invalid credentials provided, config not created")
219  errors[CONF_TOKEN] = "faulty_credentials"
220  except requests.exceptions.SSLError as error:
221  _LOGGER.error("SSL certificate error: [%s]", error)
222  errors["base"] = "ssl_error"
223  except (plexapi.exceptions.NotFound, requests.exceptions.ConnectionError):
224  server_identifier = (
225  server_config.get(CONF_URL) or plex_server.server_choice or "Unknown"
226  )
227  _LOGGER.error("Plex server could not be reached: %s", server_identifier)
228  errors[CONF_HOST] = "not_found"
229 
230  except ServerNotSpecified as available_servers:
231  self.available_serversavailable_servers = available_servers.args[0]
232  return await self.async_step_select_serverasync_step_select_server()
233 
234  except Exception:
235  _LOGGER.exception("Unknown error connecting to Plex server")
236  return self.async_abortasync_abortasync_abort(reason="unknown")
237 
238  if errors:
239  if self._manual_manual:
240  return await self.async_step_manual_setupasync_step_manual_setup(
241  user_input=server_config, errors=errors
242  )
243  return await self.async_step_userasync_step_userasync_step_user(errors=errors)
244 
245  server_id = plex_server.machine_identifier
246  url = plex_server.url_in_use
247  token = server_config.get(CONF_TOKEN)
248 
249  entry_config = {CONF_URL: url}
250  if self.client_idclient_id:
251  entry_config[CONF_CLIENT_ID] = self.client_idclient_id
252  if token:
253  entry_config[CONF_TOKEN] = token
254  if url.startswith("https"):
255  entry_config[CONF_VERIFY_SSL] = server_config.get(
256  CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL
257  )
258 
259  data = {
260  CONF_SERVER: plex_server.friendly_name,
261  CONF_SERVER_IDENTIFIER: server_id,
262  PLEX_SERVER_CONFIG: entry_config,
263  }
264 
265  entry = await self.async_set_unique_idasync_set_unique_id(server_id)
266  if self.context[CONF_SOURCE] == SOURCE_REAUTH:
267  if TYPE_CHECKING:
268  assert entry
269  self.hass.config_entries.async_update_entry(entry, data=data)
270  _LOGGER.debug("Updated config entry for %s", plex_server.friendly_name)
271  await self.hass.config_entries.async_reload(entry.entry_id)
272  return self.async_abortasync_abortasync_abort(reason="reauth_successful")
273 
274  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
275 
276  _LOGGER.debug("Valid config created for %s", plex_server.friendly_name)
277 
278  return self.async_create_entryasync_create_entryasync_create_entry(title=url, data=data)
279 
281  self, user_input: dict[str, str] | None = None
282  ) -> ConfigFlowResult:
283  """Use selected Plex server."""
284  config = dict(self.current_logincurrent_login)
285  if user_input is not None:
286  config[CONF_SERVER_IDENTIFIER] = user_input[CONF_SERVER_IDENTIFIER]
287  return await self.async_step_server_validateasync_step_server_validate(config)
288 
289  configured = configured_servers(self.hass)
290  available_servers = {
291  server_id: f"{name} ({owner})" if owner else name
292  for (name, server_id, owner) in self.available_serversavailable_servers
293  if server_id not in configured
294  }
295 
296  if not available_servers:
297  return self.async_abortasync_abortasync_abort(reason="all_configured")
298  if len(available_servers) == 1:
299  config[CONF_SERVER_IDENTIFIER] = next(iter(available_servers))
300  return await self.async_step_server_validateasync_step_server_validate(config)
301 
302  return self.async_show_formasync_show_formasync_show_form(
303  step_id="select_server",
304  data_schema=vol.Schema(
305  {vol.Required(CONF_SERVER_IDENTIFIER): vol.In(available_servers)}
306  ),
307  errors={},
308  )
309 
311  self, discovery_info: dict[str, Any]
312  ) -> ConfigFlowResult:
313  """Handle GDM discovery."""
314  machine_identifier = discovery_info["data"]["Resource-Identifier"]
315  await self.async_set_unique_idasync_set_unique_id(machine_identifier)
316  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
317  host = f"{discovery_info['from'][0]}:{discovery_info['data']['Port']}"
318  name = discovery_info["data"]["Name"]
319  self.context["title_placeholders"] = {
320  "host": host,
321  "name": name,
322  }
323  return await self.async_step_userasync_step_userasync_step_user()
324 
325  async def _async_step_plex_website_auth(self) -> ConfigFlowResult:
326  """Begin external auth flow on Plex website."""
327  self.hass.http.register_view(PlexAuthorizationCallbackView)
328  if (req := http.current_request.get()) is None:
329  raise RuntimeError("No current request in context")
330  if (hass_url := req.headers.get(HEADER_FRONTEND_BASE)) is None:
331  raise RuntimeError("No header in request")
332 
333  headers = {"Origin": hass_url}
334  payload = {
335  "X-Plex-Device-Name": X_PLEX_DEVICE_NAME,
336  "X-Plex-Version": X_PLEX_VERSION,
337  "X-Plex-Product": X_PLEX_PRODUCT,
338  "X-Plex-Device": self.hass.config.location_name,
339  "X-Plex-Platform": X_PLEX_PLATFORM,
340  "X-Plex-Model": "Plex OAuth",
341  }
342  session = async_get_clientsession(self.hass)
343  self.plexauthplexauth = PlexAuth(payload, session, headers)
344  await self.plexauthplexauth.initiate_auth()
345  forward_url = f"{hass_url}{AUTH_CALLBACK_PATH}?flow_id={self.flow_id}"
346  auth_url = self.plexauthplexauth.auth_url(forward_url)
347  return self.async_external_stepasync_external_step(step_id="obtain_token", url=auth_url)
348 
350  self, user_input: None = None
351  ) -> ConfigFlowResult:
352  """Obtain token after external auth completed."""
353  token = await self.plexauthplexauth.token(10)
354 
355  if not token:
356  return self.async_external_step_doneasync_external_step_done(next_step_id="timed_out")
357 
358  self.tokentoken = token
359  self.client_idclient_id = self.plexauthplexauth.client_identifier
360  return self.async_external_step_doneasync_external_step_done(next_step_id="use_external_token")
361 
362  async def async_step_timed_out(self, user_input: None = None) -> ConfigFlowResult:
363  """Abort flow when time expires."""
364  return self.async_abortasync_abortasync_abort(reason="token_request_timeout")
365 
367  self, user_input: None = None
368  ) -> ConfigFlowResult:
369  """Continue server validation with external token."""
370  server_config = {CONF_TOKEN: self.tokentoken}
371  return await self.async_step_server_validateasync_step_server_validate(server_config)
372 
373  async def async_step_reauth(
374  self, entry_data: Mapping[str, Any]
375  ) -> ConfigFlowResult:
376  """Handle a reauthorization flow request."""
377  self._reauth_config_reauth_config = {
378  CONF_SERVER_IDENTIFIER: entry_data[CONF_SERVER_IDENTIFIER]
379  }
380  return await self.async_step_userasync_step_userasync_step_user()
381 
382 
384  """Handle Plex options."""
385 
386  def __init__(self, config_entry: ConfigEntry) -> None:
387  """Initialize Plex options flow."""
388  self.optionsoptions = deepcopy(dict(config_entry.options))
389  self.server_idserver_id = config_entry.data[CONF_SERVER_IDENTIFIER]
390 
391  async def async_step_init(self, user_input: None = None) -> ConfigFlowResult:
392  """Manage the Plex options."""
393  return await self.async_step_plex_mp_settingsasync_step_plex_mp_settings()
394 
396  self, user_input: dict[str, Any] | None = None
397  ) -> ConfigFlowResult:
398  """Manage the Plex media_player options."""
399  plex_server = get_plex_server(self.hass, self.server_idserver_id)
400 
401  if user_input is not None:
402  self.optionsoptions[MP_DOMAIN][CONF_USE_EPISODE_ART] = user_input[
403  CONF_USE_EPISODE_ART
404  ]
405  self.optionsoptions[MP_DOMAIN][CONF_IGNORE_NEW_SHARED_USERS] = user_input[
406  CONF_IGNORE_NEW_SHARED_USERS
407  ]
408  self.optionsoptions[MP_DOMAIN][CONF_IGNORE_PLEX_WEB_CLIENTS] = user_input[
409  CONF_IGNORE_PLEX_WEB_CLIENTS
410  ]
411 
412  account_data = {
413  user: {"enabled": bool(user in user_input[CONF_MONITORED_USERS])}
414  for user in plex_server.accounts
415  }
416 
417  self.optionsoptions[MP_DOMAIN][CONF_MONITORED_USERS] = account_data
418 
419  return self.async_create_entryasync_create_entry(title="", data=self.optionsoptions)
420 
421  available_accounts = {name: name for name in plex_server.accounts}
422  available_accounts[plex_server.owner] += " [Owner]"
423 
424  default_accounts = plex_server.accounts
425  known_accounts = set(plex_server.option_monitored_users)
426  if known_accounts:
427  default_accounts = {
428  user
429  for user in plex_server.option_monitored_users
430  if plex_server.option_monitored_users[user]["enabled"]
431  }
432  default_accounts.intersection_update(plex_server.accounts)
433  for user in plex_server.accounts:
434  if user not in known_accounts:
435  available_accounts[user] += " [New]"
436 
437  if not plex_server.option_ignore_new_shared_users:
438  for new_user in plex_server.accounts - known_accounts:
439  default_accounts.add(new_user)
440 
441  return self.async_show_formasync_show_form(
442  step_id="plex_mp_settings",
443  data_schema=vol.Schema(
444  {
445  vol.Required(
446  CONF_USE_EPISODE_ART,
447  default=plex_server.option_use_episode_art,
448  ): bool,
449  vol.Optional(
450  CONF_MONITORED_USERS, default=default_accounts
451  ): cv.multi_select(available_accounts),
452  vol.Required(
453  CONF_IGNORE_NEW_SHARED_USERS,
454  default=plex_server.option_ignore_new_shared_users,
455  ): bool,
456  vol.Required(
457  CONF_IGNORE_PLEX_WEB_CLIENTS,
458  default=plex_server.option_ignore_plexweb_clients,
459  ): bool,
460  }
461  ),
462  )
463 
464 
465 class PlexAuthorizationCallbackView(HomeAssistantView):
466  """Handle callback from external auth."""
467 
468  url = AUTH_CALLBACK_PATH
469  name = AUTH_CALLBACK_NAME
470  requires_auth = False
471 
472  async def get(self, request):
473  """Receive authorization confirmation."""
474  hass = request.app[KEY_HASS]
475  await hass.config_entries.flow.async_configure(
476  flow_id=request.query["flow_id"], user_input=None
477  )
478 
479  return web_response.Response(
480  headers={"content-type": "text/html"},
481  text="<script>window.close()</script>Success! This window can be closed",
482  )
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
Definition: config_flow.py:123
ConfigFlowResult async_step_server_validate(self, dict[str, Any] server_config)
Definition: config_flow.py:202
ConfigFlowResult async_step_user_advanced(self, dict[str, str]|None user_input=None, dict[str, str]|None errors=None)
Definition: config_flow.py:135
ConfigFlowResult async_step_select_server(self, dict[str, str]|None user_input=None)
Definition: config_flow.py:282
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:375
PlexOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:107
ConfigFlowResult async_step_timed_out(self, None user_input=None)
Definition: config_flow.py:362
ConfigFlowResult async_step_integration_discovery(self, dict[str, Any] discovery_info)
Definition: config_flow.py:312
ConfigFlowResult async_step_manual_setup(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
Definition: config_flow.py:158
ConfigFlowResult async_step_use_external_token(self, None user_input=None)
Definition: config_flow.py:368
ConfigFlowResult async_step_obtain_token(self, None user_input=None)
Definition: config_flow.py:351
ConfigFlowResult async_step_init(self, None user_input=None)
Definition: config_flow.py:391
ConfigFlowResult async_step_plex_mp_settings(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:397
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step(self, *str|None step_id=None, str url, Mapping[str, str]|None description_placeholders=None)
bool show_advanced_options(self)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_external_step_done(self, *str next_step_id)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None async_discover(HomeAssistant hass)
Definition: config_flow.py:82
set[str] configured_servers(HomeAssistant hass)
Definition: config_flow.py:74
PlexServer get_plex_server(HomeAssistant hass, str server_id)
Definition: helpers.py:34
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)