1 """Config flow for Hunter Douglas PowerView integration."""
3 from __future__
import annotations
6 from typing
import TYPE_CHECKING, Any, Self
8 import voluptuous
as vol
16 from .const
import DOMAIN, HUB_EXCEPTIONS
17 from .util
import async_connect_hub
19 _LOGGER = logging.getLogger(__name__)
21 HAP_SUFFIX =
"._hap._tcp.local."
22 POWERVIEW_G2_SUFFIX =
"._powerview._tcp.local."
23 POWERVIEW_G3_SUFFIX =
"._PowerView-G3._tcp.local."
26 async
def validate_input(hass: HomeAssistant, hub_address: str) -> dict[str, str]:
27 """Validate the user input allows us to connect.
29 Data has the keys from DATA_SCHEMA with values provided by the user.
33 device_info = api.device_info
34 if hub.role !=
"Primary":
36 f
"{hub.name} ({hub.hub_address}) is the {hub.role} Hub. "
37 "Only the Primary can manage shades"
40 _LOGGER.debug(
"Connection made using api version: %s", hub.api_version)
44 "title": device_info.name,
45 "unique_id": device_info.serial_number,
46 CONF_API_VERSION: hub.api_version,
51 """Handle a config flow for Hunter Douglas PowerView."""
57 """Initialize the powerview config flow."""
61 self.data_schema: dict = {vol.Required(CONF_HOST): str}
64 self, user_input: dict[str, Any] |
None =
None
65 ) -> ConfigFlowResult:
66 """Handle the initial step."""
67 errors: dict[str, str] = {}
69 if user_input
is not None:
72 if info
and not error:
74 CONF_HOST: user_input[CONF_HOST],
75 CONF_NAME: info[
"title"],
76 CONF_API_VERSION: info[CONF_API_VERSION],
88 assert error
is not None
89 errors[
"base"] = error
92 step_id=
"user", data_schema=vol.Schema(self.data_schema), errors=errors
97 ) -> tuple[dict[str, str],
None] | tuple[
None, str]:
102 except HUB_EXCEPTIONS:
103 return None,
"cannot_connect"
104 except UnsupportedDevice:
105 return None,
"unsupported_device"
107 _LOGGER.exception(
"Unexpected exception")
108 return None,
"unknown"
113 self, discovery_info: dhcp.DhcpServiceInfo
114 ) -> ConfigFlowResult:
115 """Handle DHCP discovery."""
121 self, discovery_info: zeroconf.ZeroconfServiceInfo
122 ) -> ConfigFlowResult:
123 """Handle zeroconf discovery."""
125 name = discovery_info.name.removesuffix(POWERVIEW_G2_SUFFIX)
126 name = name.removesuffix(POWERVIEW_G3_SUFFIX)
131 self, discovery_info: zeroconf.ZeroconfServiceInfo
132 ) -> ConfigFlowResult:
133 """Handle HomeKit discovery."""
135 name = discovery_info.name.removesuffix(HAP_SUFFIX)
140 """Confirm dhcp or homekit discovery."""
144 if self.hass.config_entries.flow.async_has_matching_flow(self):
151 assert info
is not None
153 api_version = info[CONF_API_VERSION]
155 self.
discovered_namediscovered_name = f
"Powerview Generation {api_version}"
157 await self.
async_set_unique_idasync_set_unique_id(info[
"unique_id"], raise_on_progress=
False)
163 CONF_API_VERSION: api_version,
168 """Return True if other_flow is matching this flow."""
169 return other_flow.discovered_ip == self.
discovered_ipdiscovered_ip
172 self, user_input: dict[str, Any] |
None =
None
173 ) -> ConfigFlowResult:
174 """Attempt to link with Powerview."""
175 if user_input
is not None:
187 step_id=
"link", description_placeholders=self.
powerview_configpowerview_config
192 """Error to indicate the device is not supported."""
bool is_matching(self, Self other_flow)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
tuple[dict[str, str], None]|tuple[None, str] _async_validate_or_error(self, str host)
ConfigFlowResult async_step_homekit(self, zeroconf.ZeroconfServiceInfo discovery_info)
ConfigFlowResult async_step_discovery_confirm(self)
ConfigFlowResult async_step_link(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_dhcp(self, dhcp.DhcpServiceInfo discovery_info)
ConfigFlowResult async_step_zeroconf(self, zeroconf.ZeroconfServiceInfo discovery_info)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
None _set_confirm_only(self)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
dict[str, str] validate_input(HomeAssistant hass, str hub_address)
PowerviewAPI async_connect_hub(HomeAssistant hass, str address, int|None api_version=None)