1 """Config flow for HLK-SW16."""
6 from hlk_sw16
import create_hlk_sw16_connection
7 from hlk_sw16.protocol
import SW16Client
8 import voluptuous
as vol
16 DEFAULT_KEEP_ALIVE_INTERVAL,
18 DEFAULT_RECONNECT_INTERVAL,
21 from .errors
import CannotConnect
23 DATA_SCHEMA = vol.Schema(
25 vol.Required(CONF_HOST): str,
26 vol.Optional(CONF_PORT, default=DEFAULT_PORT): vol.Coerce(int),
31 async
def connect_client(hass: HomeAssistant, user_input: dict[str, Any]) -> SW16Client:
32 """Connect the HLK-SW16 client."""
33 client_aw = create_hlk_sw16_connection(
34 host=user_input[CONF_HOST],
35 port=user_input[CONF_PORT],
37 timeout=CONNECTION_TIMEOUT,
38 reconnect_interval=DEFAULT_RECONNECT_INTERVAL,
39 keep_alive_interval=DEFAULT_KEEP_ALIVE_INTERVAL,
41 async
with asyncio.timeout(CONNECTION_TIMEOUT):
42 return await client_aw
45 async
def validate_input(hass: HomeAssistant, user_input: dict[str, Any]) ->
None:
46 """Validate the user input allows us to connect."""
49 except TimeoutError
as err:
50 raise CannotConnect
from err
54 def disconnect_callback():
55 if client.in_transaction:
56 client.active_transaction.set_exception(CannotConnect)
58 client.disconnect_callback = disconnect_callback
61 client.disconnect_callback =
None
65 client.disconnect_callback =
None
70 """Handle a HLK-SW16 config flow."""
79 self, user_input: dict[str, Any] |
None =
None
80 ) -> ConfigFlowResult:
81 """Handle the initial step."""
83 if user_input
is not None:
85 {CONF_HOST: user_input[CONF_HOST], CONF_PORT: user_input[CONF_PORT]}
89 address = f
"{user_input[CONF_HOST]}:{user_input[CONF_PORT]}"
92 errors[
"base"] =
"cannot_connect"
95 step_id=
"user", data_schema=DATA_SCHEMA, errors=errors
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
None validate_input(HomeAssistant hass, dict[str, Any] user_input)
SW16Client connect_client(HomeAssistant hass, dict[str, Any] user_input)