Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for ONVIF."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import logging
7 from pprint import pformat
8 from typing import Any
9 from urllib.parse import urlparse
10 
11 from onvif.util import is_auth_error, stringify_onvif_error
12 import voluptuous as vol
13 from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery
14 from wsdiscovery.scope import Scope
15 from wsdiscovery.service import Service
16 from zeep.exceptions import Fault
17 
18 from homeassistant.components import dhcp
19 from homeassistant.components.ffmpeg import CONF_EXTRA_ARGUMENTS
21  CONF_RTSP_TRANSPORT,
22  CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
23  RTSP_TRANSPORTS,
24 )
25 from homeassistant.config_entries import (
26  ConfigEntry,
27  ConfigEntryState,
28  ConfigFlow,
29  ConfigFlowResult,
30  OptionsFlow,
31 )
32 from homeassistant.const import (
33  CONF_HOST,
34  CONF_NAME,
35  CONF_PASSWORD,
36  CONF_PORT,
37  CONF_USERNAME,
38 )
39 from homeassistant.core import HomeAssistant, callback
40 from homeassistant.data_entry_flow import AbortFlow
41 from homeassistant.helpers import device_registry as dr
42 
43 from .const import (
44  CONF_DEVICE_ID,
45  CONF_ENABLE_WEBHOOKS,
46  CONF_HARDWARE,
47  DEFAULT_ARGUMENTS,
48  DEFAULT_ENABLE_WEBHOOKS,
49  DEFAULT_PORT,
50  DOMAIN,
51  GET_CAPABILITIES_EXCEPTIONS,
52  LOGGER,
53 )
54 from .device import get_device
55 
56 CONF_MANUAL_INPUT = "Manually configure ONVIF device"
57 
58 
59 def wsdiscovery() -> list[Service]:
60  """Get ONVIF Profile S devices from network."""
61  discovery = WSDiscovery(ttl=4)
62  try:
63  discovery.start()
64  return discovery.searchServices(
65  scopes=[Scope("onvif://www.onvif.org/Profile/Streaming")]
66  )
67  finally:
68  discovery.stop()
69  # Stop the threads started by WSDiscovery since otherwise there is a leak.
70  discovery._stopThreads() # noqa: SLF001
71 
72 
73 async def async_discovery(hass: HomeAssistant) -> list[dict[str, Any]]:
74  """Return if there are devices that can be discovered."""
75  LOGGER.debug("Starting ONVIF discovery")
76  services = await hass.async_add_executor_job(wsdiscovery)
77 
78  devices = []
79  for service in services:
80  url = urlparse(service.getXAddrs()[0])
81  device = {
82  CONF_DEVICE_ID: None,
83  CONF_NAME: service.getEPR(),
84  CONF_HOST: url.hostname,
85  CONF_PORT: url.port or 80,
86  CONF_HARDWARE: None,
87  }
88  for scope in service.getScopes():
89  scope_str = scope.getValue()
90  if scope_str.lower().startswith("onvif://www.onvif.org/name"):
91  device[CONF_NAME] = scope_str.split("/")[-1]
92  if scope_str.lower().startswith("onvif://www.onvif.org/hardware"):
93  device[CONF_HARDWARE] = scope_str.split("/")[-1]
94  if scope_str.lower().startswith("onvif://www.onvif.org/mac"):
95  device[CONF_DEVICE_ID] = scope_str.split("/")[-1]
96  devices.append(device)
97 
98  return devices
99 
100 
101 class OnvifFlowHandler(ConfigFlow, domain=DOMAIN):
102  """Handle a ONVIF config flow."""
103 
104  VERSION = 1
105 
106  @staticmethod
107  @callback
109  config_entry: ConfigEntry,
110  ) -> OnvifOptionsFlowHandler:
111  """Get the options flow for this handler."""
112  return OnvifOptionsFlowHandler(config_entry)
113 
114  def __init__(self) -> None:
115  """Initialize the ONVIF config flow."""
116  self.device_iddevice_id = None
117  self.devices: list[dict[str, Any]] = []
118  self.onvif_configonvif_config: dict[str, Any] = {}
119 
120  async def async_step_user(
121  self, user_input: dict[str, Any] | None = None
122  ) -> ConfigFlowResult:
123  """Handle user flow."""
124  if user_input:
125  if user_input["auto"]:
126  return await self.async_step_deviceasync_step_device()
127  return await self.async_step_configureasync_step_configure()
128 
129  return self.async_show_formasync_show_formasync_show_form(
130  step_id="user",
131  data_schema=vol.Schema({vol.Required("auto", default=True): bool}),
132  )
133 
134  async def async_step_reauth(
135  self, entry_data: Mapping[str, Any]
136  ) -> ConfigFlowResult:
137  """Handle re-authentication of an existing config entry."""
138  return await self.async_step_reauth_confirmasync_step_reauth_confirm()
139 
141  self, user_input: dict[str, Any] | None = None
142  ) -> ConfigFlowResult:
143  """Confirm reauth."""
144  errors: dict[str, str] | None = {}
145  reauth_entry = self._get_reauth_entry_get_reauth_entry()
146  description_placeholders: dict[str, str] | None = None
147  if user_input is not None:
148  self.onvif_configonvif_config = reauth_entry.data | user_input
149  errors, description_placeholders = await self.async_setup_profilesasync_setup_profiles(
150  configure_unique_id=False
151  )
152  if not errors:
153  return self.async_update_reload_and_abortasync_update_reload_and_abort(
154  reauth_entry, data=self.onvif_configonvif_config
155  )
156 
157  username = (user_input or {}).get(CONF_USERNAME) or reauth_entry.data[
158  CONF_USERNAME
159  ]
160  return self.async_show_formasync_show_formasync_show_form(
161  step_id="reauth_confirm",
162  data_schema=vol.Schema(
163  {
164  vol.Required(CONF_USERNAME, default=username): str,
165  vol.Required(CONF_PASSWORD): str,
166  }
167  ),
168  errors=errors,
169  description_placeholders=description_placeholders,
170  )
171 
172  async def async_step_dhcp(
173  self, discovery_info: dhcp.DhcpServiceInfo
174  ) -> ConfigFlowResult:
175  """Handle dhcp discovery."""
176  hass = self.hass
177  mac = discovery_info.macaddress
178  registry = dr.async_get(self.hass)
179  if not (
180  device := registry.async_get_device(
181  connections={(dr.CONNECTION_NETWORK_MAC, mac)}
182  )
183  ):
184  return self.async_abortasync_abortasync_abort(reason="no_devices_found")
185  for entry_id in device.config_entries:
186  if (
187  not (entry := hass.config_entries.async_get_entry(entry_id))
188  or entry.domain != DOMAIN
189  or entry.state is ConfigEntryState.LOADED
190  ):
191  continue
192  if hass.config_entries.async_update_entry(
193  entry, data=entry.data | {CONF_HOST: discovery_info.ip}
194  ):
195  hass.async_create_task(self.hass.config_entries.async_reload(entry_id))
196  return self.async_abortasync_abortasync_abort(reason="already_configured")
197 
198  async def async_step_device(
199  self, user_input: dict[str, str] | None = None
200  ) -> ConfigFlowResult:
201  """Handle WS-Discovery.
202 
203  Let user choose between discovered devices and manual configuration.
204  If no device is found allow user to manually input configuration.
205  """
206  if user_input:
207  if user_input[CONF_HOST] == CONF_MANUAL_INPUT:
208  return await self.async_step_configureasync_step_configure()
209 
210  for device in self.devices:
211  if device[CONF_HOST] == user_input[CONF_HOST]:
212  self.device_iddevice_id = device[CONF_DEVICE_ID]
213  self.onvif_configonvif_config = {
214  CONF_NAME: device[CONF_NAME],
215  CONF_HOST: device[CONF_HOST],
216  CONF_PORT: device[CONF_PORT],
217  }
218  return await self.async_step_configureasync_step_configure()
219 
220  discovery = await async_discovery(self.hass)
221  for device in discovery:
222  configured = any(
223  entry.unique_id == device[CONF_DEVICE_ID]
224  for entry in self._async_current_entries_async_current_entries()
225  )
226 
227  if not configured:
228  self.devices.append(device)
229 
230  if LOGGER.isEnabledFor(logging.DEBUG):
231  LOGGER.debug("Discovered ONVIF devices %s", pformat(self.devices))
232 
233  if self.devices:
234  devices = {CONF_MANUAL_INPUT: CONF_MANUAL_INPUT}
235  for device in self.devices:
236  description = f"{device[CONF_NAME]} ({device[CONF_HOST]})"
237  if hardware := device[CONF_HARDWARE]:
238  description += f" [{hardware}]"
239  devices[device[CONF_HOST]] = description
240 
241  return self.async_show_formasync_show_formasync_show_form(
242  step_id="device",
243  data_schema=vol.Schema({vol.Optional(CONF_HOST): vol.In(devices)}),
244  )
245 
246  return await self.async_step_configureasync_step_configure()
247 
249  self, user_input: dict[str, Any] | None = None
250  ) -> ConfigFlowResult:
251  """Device configuration."""
252  errors: dict[str, str] = {}
253  description_placeholders: dict[str, str] = {}
254  if user_input:
255  self.onvif_configonvif_config = user_input
256  errors, description_placeholders = await self.async_setup_profilesasync_setup_profiles()
257  if not errors:
258  title = f"{self.onvif_config[CONF_NAME]} - {self.device_id}"
259  return self.async_create_entryasync_create_entryasync_create_entry(title=title, data=self.onvif_configonvif_config)
260 
261  def conf(name, default=None):
262  return self.onvif_configonvif_config.get(name, default)
263 
264  # Username and Password are optional and default empty
265  # due to some cameras not allowing you to change ONVIF user settings.
266  # See https://github.com/home-assistant/core/issues/39182
267  # and https://github.com/home-assistant/core/issues/35904
268  return self.async_show_formasync_show_formasync_show_form(
269  step_id="configure",
270  data_schema=vol.Schema(
271  {
272  vol.Required(CONF_NAME, default=conf(CONF_NAME)): str,
273  vol.Required(CONF_HOST, default=conf(CONF_HOST)): str,
274  vol.Required(CONF_PORT, default=conf(CONF_PORT, DEFAULT_PORT)): int,
275  vol.Optional(CONF_USERNAME, default=conf(CONF_USERNAME, "")): str,
276  vol.Optional(CONF_PASSWORD, default=conf(CONF_PASSWORD, "")): str,
277  }
278  ),
279  errors=errors,
280  description_placeholders=description_placeholders,
281  )
282 
284  self, configure_unique_id: bool = True
285  ) -> tuple[dict[str, str], dict[str, str]]:
286  """Fetch ONVIF device profiles."""
287  if LOGGER.isEnabledFor(logging.DEBUG):
288  LOGGER.debug(
289  "Fetching profiles from ONVIF device %s", pformat(self.onvif_configonvif_config)
290  )
291 
292  device = get_device(
293  self.hass,
294  self.onvif_configonvif_config[CONF_HOST],
295  self.onvif_configonvif_config[CONF_PORT],
296  self.onvif_configonvif_config[CONF_USERNAME],
297  self.onvif_configonvif_config[CONF_PASSWORD],
298  )
299 
300  try:
301  await device.update_xaddrs()
302  device_mgmt = await device.create_devicemgmt_service()
303  # Get the MAC address to use as the unique ID for the config flow
304  if not self.device_iddevice_id:
305  try:
306  network_interfaces = await device_mgmt.GetNetworkInterfaces()
307  interface = next(
308  filter(lambda interface: interface.Enabled, network_interfaces),
309  None,
310  )
311  if interface:
312  self.device_iddevice_id = interface.Info.HwAddress
313  except Fault as fault:
314  if "not implemented" not in fault.message:
315  raise
316  LOGGER.debug(
317  "%s: Could not get network interfaces: %s",
318  self.onvif_configonvif_config[CONF_NAME],
319  stringify_onvif_error(fault),
320  )
321  # If no network interfaces are exposed, fallback to serial number
322  if not self.device_iddevice_id:
323  device_info = await device_mgmt.GetDeviceInformation()
324  self.device_iddevice_id = device_info.SerialNumber
325 
326  if not self.device_iddevice_id:
327  raise AbortFlow(reason="no_mac")
328 
329  if configure_unique_id:
330  await self.async_set_unique_idasync_set_unique_id(self.device_iddevice_id, raise_on_progress=False)
331  self._abort_if_unique_id_configured_abort_if_unique_id_configured(
332  updates={
333  CONF_HOST: self.onvif_configonvif_config[CONF_HOST],
334  CONF_PORT: self.onvif_configonvif_config[CONF_PORT],
335  CONF_NAME: self.onvif_configonvif_config[CONF_NAME],
336  CONF_USERNAME: self.onvif_configonvif_config[CONF_USERNAME],
337  CONF_PASSWORD: self.onvif_configonvif_config[CONF_PASSWORD],
338  }
339  )
340  # Verify there is an H264 profile
341  media_service = await device.create_media_service()
342  profiles = await media_service.GetProfiles()
343  except AttributeError: # Likely an empty document or 404 from the wrong port
344  LOGGER.debug(
345  "%s: No ONVIF service found at %s:%s",
346  self.onvif_configonvif_config[CONF_NAME],
347  self.onvif_configonvif_config[CONF_HOST],
348  self.onvif_configonvif_config[CONF_PORT],
349  exc_info=True,
350  )
351  return {CONF_PORT: "no_onvif_service"}, {}
352  except Fault as err:
353  stringified_error = stringify_onvif_error(err)
354  description_placeholders = {"error": stringified_error}
355  if is_auth_error(err):
356  LOGGER.debug(
357  "%s: Could not authenticate with camera: %s",
358  self.onvif_configonvif_config[CONF_NAME],
359  stringified_error,
360  )
361  return {CONF_PASSWORD: "auth_failed"}, description_placeholders
362  LOGGER.debug(
363  "%s: Could not determine camera capabilities: %s",
364  self.onvif_configonvif_config[CONF_NAME],
365  stringified_error,
366  exc_info=True,
367  )
368  return {"base": "onvif_error"}, description_placeholders
369  except GET_CAPABILITIES_EXCEPTIONS as err:
370  LOGGER.debug(
371  "%s: Could not determine camera capabilities: %s",
372  self.onvif_configonvif_config[CONF_NAME],
374  exc_info=True,
375  )
376  return {"base": "onvif_error"}, {"error": stringify_onvif_error(err)}
377  else:
378  if not any(
379  profile.VideoEncoderConfiguration
380  and profile.VideoEncoderConfiguration.Encoding == "H264"
381  for profile in profiles
382  ):
383  raise AbortFlow(reason="no_h264")
384  return {}, {}
385  finally:
386  await device.close()
387 
388 
390  """Handle ONVIF options."""
391 
392  def __init__(self, config_entry: ConfigEntry) -> None:
393  """Initialize ONVIF options flow."""
394  self.optionsoptions = dict(config_entry.options)
395 
396  async def async_step_init(self, user_input: None = None) -> ConfigFlowResult:
397  """Manage the ONVIF options."""
398  return await self.async_step_onvif_devicesasync_step_onvif_devices()
399 
401  self, user_input: dict[str, Any] | None = None
402  ) -> ConfigFlowResult:
403  """Manage the ONVIF devices options."""
404  if user_input is not None:
405  self.optionsoptions[CONF_EXTRA_ARGUMENTS] = user_input[CONF_EXTRA_ARGUMENTS]
406  self.optionsoptions[CONF_RTSP_TRANSPORT] = user_input[CONF_RTSP_TRANSPORT]
407  self.optionsoptions[CONF_USE_WALLCLOCK_AS_TIMESTAMPS] = user_input.get(
408  CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
409  self.config_entryconfig_entryconfig_entry.options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS, False),
410  )
411  self.optionsoptions[CONF_ENABLE_WEBHOOKS] = user_input.get(
412  CONF_ENABLE_WEBHOOKS,
413  self.config_entryconfig_entryconfig_entry.options.get(
414  CONF_ENABLE_WEBHOOKS, DEFAULT_ENABLE_WEBHOOKS
415  ),
416  )
417  return self.async_create_entryasync_create_entry(title="", data=self.optionsoptions)
418 
419  advanced_options = {}
420  if self.show_advanced_optionsshow_advanced_options:
421  advanced_options[
422  vol.Optional(
423  CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
424  default=self.config_entryconfig_entryconfig_entry.options.get(
425  CONF_USE_WALLCLOCK_AS_TIMESTAMPS, False
426  ),
427  )
428  ] = bool
429  return self.async_show_formasync_show_form(
430  step_id="onvif_devices",
431  data_schema=vol.Schema(
432  {
433  vol.Optional(
434  CONF_EXTRA_ARGUMENTS,
435  default=self.config_entryconfig_entryconfig_entry.options.get(
436  CONF_EXTRA_ARGUMENTS, DEFAULT_ARGUMENTS
437  ),
438  ): str,
439  vol.Optional(
440  CONF_RTSP_TRANSPORT,
441  default=self.config_entryconfig_entryconfig_entry.options.get(
442  CONF_RTSP_TRANSPORT, next(iter(RTSP_TRANSPORTS))
443  ),
444  ): vol.In(RTSP_TRANSPORTS),
445  vol.Optional(
446  CONF_ENABLE_WEBHOOKS,
447  default=self.config_entryconfig_entryconfig_entry.options.get(
448  CONF_ENABLE_WEBHOOKS, DEFAULT_ENABLE_WEBHOOKS
449  ),
450  ): bool,
451  **advanced_options,
452  }
453  ),
454  )
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:142
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:122
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:136
ConfigFlowResult async_step_device(self, dict[str, str]|None user_input=None)
Definition: config_flow.py:200
ConfigFlowResult async_step_configure(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:250
tuple[dict[str, str], dict[str, str]] async_setup_profiles(self, bool configure_unique_id=True)
Definition: config_flow.py:285
ConfigFlowResult async_step_dhcp(self, dhcp.DhcpServiceInfo discovery_info)
Definition: config_flow.py:174
OnvifOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:110
ConfigFlowResult async_step_onvif_devices(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:402
ConfigFlowResult async_step_init(self, None user_input=None)
Definition: config_flow.py:396
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
bool show_advanced_options(self)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
DeviceEntry get_device(HomeAssistant hass, str unique_id)
Definition: util.py:12
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
list[dict[str, Any]] async_discovery(HomeAssistant hass)
Definition: config_flow.py:73
str stringify_onvif_error(Exception error)
Definition: util.py:17
bool is_auth_error(Exception error)
Definition: util.py:41