Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config Flow to configure UniFi Protect Integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import logging
7 from pathlib import Path
8 from typing import Any
9 
10 from aiohttp import CookieJar
11 from uiprotect import ProtectApiClient
12 from uiprotect.data import NVR
13 from uiprotect.exceptions import ClientError, NotAuthorized
14 from unifi_discovery import async_console_is_alive
15 import voluptuous as vol
16 
17 from homeassistant.components import dhcp, ssdp
18 from homeassistant.config_entries import (
19  SOURCE_IGNORE,
20  ConfigEntry,
21  ConfigEntryState,
22  ConfigFlow,
23  ConfigFlowResult,
24  OptionsFlow,
25 )
26 from homeassistant.const import (
27  CONF_HOST,
28  CONF_ID,
29  CONF_PASSWORD,
30  CONF_PORT,
31  CONF_USERNAME,
32  CONF_VERIFY_SSL,
33 )
34 from homeassistant.core import HomeAssistant, callback
36  async_create_clientsession,
37  async_get_clientsession,
38 )
39 from homeassistant.helpers.storage import STORAGE_DIR
40 from homeassistant.helpers.typing import DiscoveryInfoType
41 from homeassistant.loader import async_get_integration
42 from homeassistant.util.network import is_ip_address
43 
44 from .const import (
45  CONF_ALL_UPDATES,
46  CONF_ALLOW_EA,
47  CONF_DISABLE_RTSP,
48  CONF_MAX_MEDIA,
49  CONF_OVERRIDE_CHOST,
50  DEFAULT_MAX_MEDIA,
51  DEFAULT_PORT,
52  DEFAULT_VERIFY_SSL,
53  DOMAIN,
54  MIN_REQUIRED_PROTECT_V,
55  OUTDATED_LOG_MESSAGE,
56 )
57 from .data import async_last_update_was_successful
58 from .discovery import async_start_discovery
59 from .utils import _async_resolve, _async_short_mac, _async_unifi_mac_from_hass
60 
61 _LOGGER = logging.getLogger(__name__)
62 
63 ENTRY_FAILURE_STATES = (
64  ConfigEntryState.SETUP_ERROR,
65  ConfigEntryState.SETUP_RETRY,
66 )
67 
68 
69 async def async_local_user_documentation_url(hass: HomeAssistant) -> str:
70  """Get the documentation url for creating a local user."""
71  integration = await async_get_integration(hass, DOMAIN)
72  return f"{integration.documentation}#local-user"
73 
74 
75 def _host_is_direct_connect(host: str) -> bool:
76  """Check if a host is a unifi direct connect domain."""
77  return host.endswith(".ui.direct")
78 
79 
81  hass: HomeAssistant,
82  entry: ConfigEntry,
83 ) -> bool:
84  """Check if a console is offline.
85 
86  We define offline by the config entry
87  is in a failure/retry state or the updates
88  are failing and the console is unreachable
89  since protect may be updating.
90  """
91  return bool(
92  entry.state in ENTRY_FAILURE_STATES
93  or not async_last_update_was_successful(hass, entry)
94  ) and not await async_console_is_alive(
95  async_get_clientsession(hass, verify_ssl=False), entry.data[CONF_HOST]
96  )
97 
98 
99 class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
100  """Handle a UniFi Protect config flow."""
101 
102  VERSION = 2
103 
104  def __init__(self) -> None:
105  """Init the config flow."""
106  super().__init__()
107  self._discovered_device_discovered_device: dict[str, str] = {}
108 
109  async def async_step_dhcp(
110  self, discovery_info: dhcp.DhcpServiceInfo
111  ) -> ConfigFlowResult:
112  """Handle discovery via dhcp."""
113  _LOGGER.debug("Starting discovery via: %s", discovery_info)
114  return await self._async_discovery_handoff_async_discovery_handoff()
115 
116  async def async_step_ssdp(
117  self, discovery_info: ssdp.SsdpServiceInfo
118  ) -> ConfigFlowResult:
119  """Handle a discovered UniFi device."""
120  _LOGGER.debug("Starting discovery via: %s", discovery_info)
121  return await self._async_discovery_handoff_async_discovery_handoff()
122 
123  async def _async_discovery_handoff(self) -> ConfigFlowResult:
124  """Ensure discovery is active."""
125  # Discovery requires an additional check so we use
126  # SSDP and DHCP to tell us to start it so it only
127  # runs on networks where unifi devices are present.
128  async_start_discovery(self.hass)
129  return self.async_abortasync_abortasync_abort(reason="discovery_started")
130 
132  self, discovery_info: DiscoveryInfoType
133  ) -> ConfigFlowResult:
134  """Handle integration discovery."""
135  self._discovered_device_discovered_device = discovery_info
136  mac = _async_unifi_mac_from_hass(discovery_info["hw_addr"])
137  await self.async_set_unique_idasync_set_unique_id(mac)
138  source_ip = discovery_info["source_ip"]
139  direct_connect_domain = discovery_info["direct_connect_domain"]
140  for entry in self._async_current_entries_async_current_entries():
141  if entry.source == SOURCE_IGNORE:
142  if entry.unique_id == mac:
143  return self.async_abortasync_abortasync_abort(reason="already_configured")
144  continue
145  entry_host = entry.data[CONF_HOST]
146  entry_has_direct_connect = _host_is_direct_connect(entry_host)
147  if entry.unique_id == mac:
148  new_host = None
149  if (
150  entry_has_direct_connect
151  and direct_connect_domain
152  and entry_host != direct_connect_domain
153  ):
154  new_host = direct_connect_domain
155  elif (
156  not entry_has_direct_connect
157  and is_ip_address(entry_host)
158  and entry_host != source_ip
159  and await _async_console_is_offline(self.hass, entry)
160  ):
161  new_host = source_ip
162  if new_host:
163  self.hass.config_entries.async_update_entry(
164  entry, data={**entry.data, CONF_HOST: new_host}
165  )
166  return self.async_abortasync_abortasync_abort(reason="already_configured")
167  if entry_host in (direct_connect_domain, source_ip) or (
168  entry_has_direct_connect
169  and (ip := await _async_resolve(self.hass, entry_host))
170  and ip == source_ip
171  ):
172  return self.async_abortasync_abortasync_abort(reason="already_configured")
173  return await self.async_step_discovery_confirmasync_step_discovery_confirm()
174 
176  self, user_input: dict[str, Any] | None = None
177  ) -> ConfigFlowResult:
178  """Confirm discovery."""
179  errors: dict[str, str] = {}
180  discovery_info = self._discovered_device_discovered_device
181  if user_input is not None:
182  user_input[CONF_PORT] = DEFAULT_PORT
183  nvr_data = None
184  if discovery_info["direct_connect_domain"]:
185  user_input[CONF_HOST] = discovery_info["direct_connect_domain"]
186  user_input[CONF_VERIFY_SSL] = True
187  nvr_data, errors = await self._async_get_nvr_data_async_get_nvr_data(user_input)
188  if not nvr_data or errors:
189  user_input[CONF_HOST] = discovery_info["source_ip"]
190  user_input[CONF_VERIFY_SSL] = False
191  nvr_data, errors = await self._async_get_nvr_data_async_get_nvr_data(user_input)
192  if nvr_data and not errors:
193  return self._async_create_entry_async_create_entry(nvr_data.display_name, user_input)
194 
195  placeholders = {
196  "name": discovery_info["hostname"]
197  or discovery_info["platform"]
198  or f"NVR {_async_short_mac(discovery_info['hw_addr'])}",
199  "ip_address": discovery_info["source_ip"],
200  }
201  self.context["title_placeholders"] = placeholders
202  user_input = user_input or {}
203  return self.async_show_formasync_show_formasync_show_form(
204  step_id="discovery_confirm",
205  description_placeholders={
206  **placeholders,
207  "local_user_documentation_url": await async_local_user_documentation_url(
208  self.hass
209  ),
210  },
211  data_schema=vol.Schema(
212  {
213  vol.Required(
214  CONF_USERNAME, default=user_input.get(CONF_USERNAME)
215  ): str,
216  vol.Required(CONF_PASSWORD): str,
217  }
218  ),
219  errors=errors,
220  )
221 
222  @staticmethod
223  @callback
225  config_entry: ConfigEntry,
226  ) -> OptionsFlow:
227  """Get the options flow for this handler."""
228  return OptionsFlowHandler()
229 
230  @callback
231  def _async_create_entry(self, title: str, data: dict[str, Any]) -> ConfigFlowResult:
232  return self.async_create_entryasync_create_entryasync_create_entry(
233  title=title,
234  data={**data, CONF_ID: title},
235  options={
236  CONF_DISABLE_RTSP: False,
237  CONF_ALL_UPDATES: False,
238  CONF_OVERRIDE_CHOST: False,
239  CONF_MAX_MEDIA: DEFAULT_MAX_MEDIA,
240  CONF_ALLOW_EA: False,
241  },
242  )
243 
245  self,
246  user_input: dict[str, Any],
247  ) -> tuple[NVR | None, dict[str, str]]:
248  session = async_create_clientsession(
249  self.hass, cookie_jar=CookieJar(unsafe=True)
250  )
251 
252  host = user_input[CONF_HOST]
253  port = user_input.get(CONF_PORT, DEFAULT_PORT)
254  verify_ssl = user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL)
255 
256  protect = ProtectApiClient(
257  session=session,
258  host=host,
259  port=port,
260  username=user_input[CONF_USERNAME],
261  password=user_input[CONF_PASSWORD],
262  verify_ssl=verify_ssl,
263  cache_dir=Path(self.hass.config.path(STORAGE_DIR, "unifiprotect")),
264  config_dir=Path(self.hass.config.path(STORAGE_DIR, "unifiprotect")),
265  )
266 
267  errors = {}
268  nvr_data = None
269  try:
270  bootstrap = await protect.get_bootstrap()
271  nvr_data = bootstrap.nvr
272  except NotAuthorized as ex:
273  _LOGGER.debug(ex)
274  errors[CONF_PASSWORD] = "invalid_auth"
275  except ClientError as ex:
276  _LOGGER.debug(ex)
277  errors["base"] = "cannot_connect"
278  else:
279  if nvr_data.version < MIN_REQUIRED_PROTECT_V:
280  _LOGGER.debug(
281  OUTDATED_LOG_MESSAGE,
282  nvr_data.version,
283  MIN_REQUIRED_PROTECT_V,
284  )
285  errors["base"] = "protect_version"
286 
287  auth_user = bootstrap.users.get(bootstrap.auth_user_id)
288  if auth_user and auth_user.cloud_account:
289  errors["base"] = "cloud_user"
290 
291  return nvr_data, errors
292 
293  async def async_step_reauth(
294  self, entry_data: Mapping[str, Any]
295  ) -> ConfigFlowResult:
296  """Perform reauth upon an API authentication error."""
297  return await self.async_step_reauth_confirmasync_step_reauth_confirm()
298 
300  self, user_input: dict[str, Any] | None = None
301  ) -> ConfigFlowResult:
302  """Confirm reauth."""
303  errors: dict[str, str] = {}
304 
305  # prepopulate fields
306  reauth_entry = self._get_reauth_entry_get_reauth_entry()
307  form_data = {**reauth_entry.data}
308  if user_input is not None:
309  form_data.update(user_input)
310 
311  # validate login data
312  _, errors = await self._async_get_nvr_data_async_get_nvr_data(form_data)
313  if not errors:
314  return self.async_update_reload_and_abortasync_update_reload_and_abort(reauth_entry, data=form_data)
315 
316  self.context["title_placeholders"] = {
317  "name": reauth_entry.title,
318  "ip_address": reauth_entry.data[CONF_HOST],
319  }
320  return self.async_show_formasync_show_formasync_show_form(
321  step_id="reauth_confirm",
322  data_schema=vol.Schema(
323  {
324  vol.Required(
325  CONF_USERNAME, default=form_data.get(CONF_USERNAME)
326  ): str,
327  vol.Required(CONF_PASSWORD): str,
328  }
329  ),
330  errors=errors,
331  )
332 
333  async def async_step_user(
334  self, user_input: dict[str, Any] | None = None
335  ) -> ConfigFlowResult:
336  """Handle a flow initiated by the user."""
337 
338  errors: dict[str, str] = {}
339  if user_input is not None:
340  nvr_data, errors = await self._async_get_nvr_data_async_get_nvr_data(user_input)
341 
342  if nvr_data and not errors:
343  await self.async_set_unique_idasync_set_unique_id(nvr_data.mac)
344  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
345 
346  return self._async_create_entry_async_create_entry(nvr_data.display_name, user_input)
347 
348  user_input = user_input or {}
349  return self.async_show_formasync_show_formasync_show_form(
350  step_id="user",
351  description_placeholders={
352  "local_user_documentation_url": await async_local_user_documentation_url(
353  self.hass
354  )
355  },
356  data_schema=vol.Schema(
357  {
358  vol.Required(CONF_HOST, default=user_input.get(CONF_HOST)): str,
359  vol.Required(
360  CONF_PORT, default=user_input.get(CONF_PORT, DEFAULT_PORT)
361  ): int,
362  vol.Required(
363  CONF_VERIFY_SSL,
364  default=user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL),
365  ): bool,
366  vol.Required(
367  CONF_USERNAME, default=user_input.get(CONF_USERNAME)
368  ): str,
369  vol.Required(CONF_PASSWORD): str,
370  }
371  ),
372  errors=errors,
373  )
374 
375 
377  """Handle options."""
378 
379  async def async_step_init(
380  self, user_input: dict[str, Any] | None = None
381  ) -> ConfigFlowResult:
382  """Manage the options."""
383  if user_input is not None:
384  return self.async_create_entryasync_create_entry(title="", data=user_input)
385 
386  return self.async_show_formasync_show_form(
387  step_id="init",
388  data_schema=vol.Schema(
389  {
390  vol.Optional(
391  CONF_DISABLE_RTSP,
392  default=self.config_entryconfig_entryconfig_entry.options.get(CONF_DISABLE_RTSP, False),
393  ): bool,
394  vol.Optional(
395  CONF_ALL_UPDATES,
396  default=self.config_entryconfig_entryconfig_entry.options.get(CONF_ALL_UPDATES, False),
397  ): bool,
398  vol.Optional(
399  CONF_OVERRIDE_CHOST,
400  default=self.config_entryconfig_entryconfig_entry.options.get(
401  CONF_OVERRIDE_CHOST, False
402  ),
403  ): bool,
404  vol.Optional(
405  CONF_MAX_MEDIA,
406  default=self.config_entryconfig_entryconfig_entry.options.get(
407  CONF_MAX_MEDIA, DEFAULT_MAX_MEDIA
408  ),
409  ): vol.All(vol.Coerce(int), vol.Range(min=100, max=10000)),
410  vol.Optional(
411  CONF_ALLOW_EA,
412  default=self.config_entryconfig_entryconfig_entry.options.get(CONF_ALLOW_EA, False),
413  ): bool,
414  }
415  ),
416  )
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:381
ConfigFlowResult async_step_integration_discovery(self, DiscoveryInfoType discovery_info)
Definition: config_flow.py:133
ConfigFlowResult async_step_dhcp(self, dhcp.DhcpServiceInfo discovery_info)
Definition: config_flow.py:111
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:295
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:335
ConfigFlowResult async_step_ssdp(self, ssdp.SsdpServiceInfo discovery_info)
Definition: config_flow.py:118
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:301
ConfigFlowResult async_step_discovery_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:177
tuple[NVR|None, dict[str, str]] _async_get_nvr_data(self, dict[str, Any] user_input)
Definition: config_flow.py:247
ConfigFlowResult _async_create_entry(self, str title, dict[str, Any] data)
Definition: config_flow.py:231
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
bool async_last_update_was_successful(HomeAssistant hass, PowerwallConfigEntry entry)
Definition: __init__.py:284
aiohttp.ClientSession async_create_clientsession()
Definition: coordinator.py:51
bool _async_console_is_offline(HomeAssistant hass, ConfigEntry entry)
Definition: config_flow.py:83
str async_local_user_documentation_url(HomeAssistant hass)
Definition: config_flow.py:69
None async_start_discovery(HomeAssistant hass)
Definition: discovery.py:26
str|None _async_resolve(HomeAssistant hass, str host)
Definition: utils.py:56
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
Integration async_get_integration(HomeAssistant hass, str domain)
Definition: loader.py:1354
bool is_ip_address(str address)
Definition: network.py:63