1 """Config flow for Nobø Ecohub integration."""
3 from __future__
import annotations
6 from typing
import TYPE_CHECKING, Any
8 from pynobo
import nobo
9 import voluptuous
as vol
26 OVERRIDE_TYPE_CONSTANT,
30 DATA_NOBO_HUB_IMPL =
"nobo_hub_flow_implementation"
31 DEVICE_INPUT =
"device_input"
35 """Handle a config flow for Nobø Ecohub."""
40 """Initialize the config flow."""
42 self.
_hub_hub: str |
None =
None
45 self, user_input: dict[str, Any] |
None =
None
46 ) -> ConfigFlowResult:
47 """Handle the initial step."""
55 if user_input
is not None:
56 if user_input[
"device"] ==
"manual":
58 self.
_hub_hub = user_input[
"device"]
61 hubs = self.
_hubs_hubs()
62 hubs[
"manual"] =
"Manual"
63 data_schema = vol.Schema(
65 vol.Required(
"device"): vol.In(hubs),
70 data_schema=data_schema,
74 self, user_input: dict[str, Any] |
None =
None
75 ) -> ConfigFlowResult:
76 """Handle configuration of a selected discovered device."""
81 if user_input
is not None:
83 serial_suffix = user_input[
"serial_suffix"]
84 serial = f
"{serial_prefix}{serial_suffix}"
87 except NoboHubConnectError
as error:
88 errors[
"base"] = error.msg
90 user_input = user_input
or {}
93 data_schema=vol.Schema(
96 "serial_suffix", default=user_input.get(
"serial_suffix")
101 description_placeholders={
107 self, user_input: dict[str, Any] |
None =
None
108 ) -> ConfigFlowResult:
109 """Handle configuration of an undiscovered device."""
111 if user_input
is not None:
112 serial = user_input[CONF_SERIAL]
113 ip_address = user_input[CONF_IP_ADDRESS]
116 except NoboHubConnectError
as error:
117 errors[
"base"] = error.msg
119 user_input = user_input
or {}
122 data_schema=vol.Schema(
124 vol.Required(CONF_SERIAL, default=user_input.get(CONF_SERIAL)): str,
126 CONF_IP_ADDRESS, default=user_input.get(CONF_IP_ADDRESS)
134 self, serial: str, ip_address: str, auto_discovered: bool
135 ) -> ConfigFlowResult:
143 CONF_IP_ADDRESS: ip_address,
144 CONF_AUTO_DISCOVERED: auto_discovered,
149 if not len(serial) == 12
or not serial.isdigit():
152 socket.inet_aton(ip_address)
153 except OSError
as err:
155 hub = nobo(serial=serial, ip=ip_address, discover=
False, synchronous=
False)
156 if not await hub.async_connect_hub(ip_address, serial):
158 name = hub.hub_info[
"name"]
164 return f
"{serial_prefix}XXX ({ip})"
175 config_entry: ConfigEntry,
177 """Get the options flow for this handler."""
182 """Error with connecting to Nobø Ecohub."""
185 """Instantiate error."""
191 """Handles options flow for the component."""
194 """Manage the options."""
196 if user_input
is not None:
198 CONF_OVERRIDE_TYPE: user_input.get(CONF_OVERRIDE_TYPE),
203 CONF_OVERRIDE_TYPE, OVERRIDE_TYPE_CONSTANT
208 vol.Required(CONF_OVERRIDE_TYPE, default=override_type): vol.In(
209 [OVERRIDE_TYPE_CONSTANT, OVERRIDE_TYPE_NOW]
214 return self.
async_show_formasync_show_form(step_id=
"init", data_schema=schema)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
str _test_connection(self, str serial, str ip_address)
ConfigFlowResult _create_configuration(self, str serial, str ip_address, bool auto_discovered)
ConfigFlowResult async_step_manual(self, dict[str, Any]|None user_input=None)
OptionsFlow async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_selected(self, dict[str, Any]|None user_input=None)
def _format_hub(ip, serial_prefix)
ConfigFlowResult async_step_init(self, user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)