1 """Config flow for kmtronic integration."""
3 from __future__
import annotations
9 from pykmtronic.auth
import Auth
10 from pykmtronic.hub
import KMTronicHubAPI
11 import voluptuous
as vol
24 from .const
import CONF_REVERSE, DOMAIN
26 _LOGGER = logging.getLogger(__name__)
28 DATA_SCHEMA = vol.Schema(
30 vol.Required(CONF_HOST): str,
31 vol.Required(CONF_USERNAME): str,
32 vol.Required(CONF_PASSWORD): str,
38 """Validate the user input allows us to connect."""
39 session = aiohttp_client.async_get_clientsession(hass)
42 f
"http://{data[CONF_HOST]}",
46 hub = KMTronicHubAPI(auth)
49 await hub.async_get_status()
50 except aiohttp.client_exceptions.ClientResponseError
as err:
51 raise InvalidAuth
from err
52 except aiohttp.client_exceptions.ClientConnectorError
as err:
53 raise CannotConnect
from err
59 """Handle a config flow for kmtronic."""
66 config_entry: ConfigEntry,
67 ) -> KMTronicOptionsFlow:
68 """Get the options flow for this handler."""
72 self, user_input: dict[str, Any] |
None =
None
73 ) -> ConfigFlowResult:
74 """Handle the initial step."""
76 if user_input
is not None:
82 errors[
"base"] =
"cannot_connect"
84 errors[
"base"] =
"invalid_auth"
86 _LOGGER.exception(
"Unexpected exception")
87 errors[
"base"] =
"unknown"
90 step_id=
"user", data_schema=DATA_SCHEMA, errors=errors
95 """Error to indicate we cannot connect."""
99 """Error to indicate there is invalid auth."""
103 """Handle options."""
106 self, user_input: dict[str, Any] |
None =
None
107 ) -> ConfigFlowResult:
108 """Manage the options."""
109 if user_input
is not None:
114 data_schema=vol.Schema(
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
KMTronicOptionsFlow async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
def validate_input(HomeAssistant hass, data)