1 """Config flow for Rabbit Air integration."""
3 from __future__
import annotations
8 from rabbitair
import UdpClient
9 import voluptuous
as vol
18 from .const
import DOMAIN
20 _LOGGER = logging.getLogger(__name__)
23 async
def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
24 """Validate the user input allows us to connect."""
27 zeroconf_instance = await zeroconf.async_get_async_instance(hass)
29 data[CONF_HOST], data[CONF_ACCESS_TOKEN], zeroconf=zeroconf_instance
31 info = await client.get_info()
32 except Exception
as err:
33 _LOGGER.debug(
"Connection attempt failed: %s", err)
35 except ValueError
as err:
37 raise InvalidAccessToken
from err
38 except TimeoutError
as err:
40 raise TimeoutConnect
from err
41 except OSError
as err:
43 raise InvalidHost
from err
44 except Exception
as err:
46 raise CannotConnect
from err
49 return {
"mac": info.mac}
53 """Handle a config flow for Rabbit Air."""
57 _discovered_host: str |
None =
None
60 self, user_input: dict[str, Any] |
None =
None
61 ) -> ConfigFlowResult:
62 """Handle the initial step."""
65 if user_input
is not None:
69 errors[
"base"] =
"cannot_connect"
70 except InvalidAccessToken:
71 errors[
"base"] =
"invalid_access_token"
73 errors[
"base"] =
"invalid_host"
74 except TimeoutConnect:
75 errors[
"base"] =
"timeout_connect"
76 except Exception
as err:
77 _LOGGER.debug(
"Unexpected exception: %s", err)
78 errors[
"base"] =
"unknown"
80 user_input[CONF_MAC] = info[
"mac"]
85 user_input = user_input
or {}
87 token = user_input.get(CONF_ACCESS_TOKEN)
90 data_schema=vol.Schema(
92 vol.Required(CONF_HOST, default=host): str,
93 vol.Required(CONF_ACCESS_TOKEN, default=token): vol.All(
94 str, vol.Length(min=32, max=32)
102 self, discovery_info: zeroconf.ZeroconfServiceInfo
103 ) -> ConfigFlowResult:
104 """Handle zeroconf discovery."""
105 mac = dr.format_mac(discovery_info.properties[
"id"])
113 """Error to indicate we cannot connect."""
117 """Error to indicate the access token is not valid."""
121 """Error to indicate the host is not valid."""
125 """Error to indicate the connection attempt is timed out."""
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_zeroconf(self, ZeroconfServiceInfo discovery_info)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, Any] validate_input(HomeAssistant hass, dict[str, Any] data)