1 """Adds config flow for dnsip integration."""
3 from __future__
import annotations
10 from aiodns.error
import DNSError
11 import voluptuous
as vol
35 DEFAULT_RESOLVER_IPV6,
39 DATA_SCHEMA = vol.Schema(
41 vol.Required(CONF_HOSTNAME, default=DEFAULT_HOSTNAME): cv.string,
44 DATA_SCHEMA_ADV = vol.Schema(
46 vol.Required(CONF_HOSTNAME, default=DEFAULT_HOSTNAME): cv.string,
47 vol.Optional(CONF_RESOLVER): cv.string,
48 vol.Optional(CONF_PORT): cv.port,
49 vol.Optional(CONF_RESOLVER_IPV6): cv.string,
50 vol.Optional(CONF_PORT_IPV6): cv.port,
62 """Validate hostname."""
64 async
def async_check(
65 hostname: str, resolver: str, qtype: str, port: int = 53
67 """Return if able to resolve hostname."""
69 with contextlib.suppress(DNSError):
71 await aiodns.DNSResolver(
72 nameservers=[resolver], udp_port=port, tcp_port=port
73 ).query(hostname, qtype)
77 result: dict[str, bool] = {}
79 tasks = await asyncio.gather(
80 async_check(hostname, resolver_ipv4,
"A", port=port),
81 async_check(hostname, resolver_ipv6,
"AAAA", port=port_ipv6),
82 async_check(hostname, resolver_ipv4,
"AAAA", port=port),
85 result[CONF_IPV4] = tasks[0]
86 result[CONF_IPV6] = tasks[1]
87 result[CONF_IPV6_V4] = tasks[2]
93 """Handle a config flow for dnsip integration."""
101 config_entry: ConfigEntry,
102 ) -> DnsIPOptionsFlowHandler:
103 """Return Option handler."""
107 self, user_input: dict[str, Any] |
None =
None
108 ) -> ConfigFlowResult:
109 """Handle the initial step."""
114 hostname = user_input[CONF_HOSTNAME]
115 name = DEFAULT_NAME
if hostname == DEFAULT_HOSTNAME
else hostname
116 resolver = user_input.get(CONF_RESOLVER, DEFAULT_RESOLVER)
117 resolver_ipv6 = user_input.get(CONF_RESOLVER_IPV6, DEFAULT_RESOLVER_IPV6)
118 port = user_input.get(CONF_PORT, DEFAULT_PORT)
119 port_ipv6 = user_input.get(CONF_PORT_IPV6, DEFAULT_PORT)
122 hostname, resolver, resolver_ipv6, port, port_ipv6
125 set_resolver = resolver
126 if validate[CONF_IPV6]:
127 set_resolver = resolver_ipv6
130 not validate[CONF_IPV4]
131 and not validate[CONF_IPV6]
132 and not validate[CONF_IPV6_V4]
134 errors[
"base"] =
"invalid_hostname"
142 CONF_HOSTNAME: hostname,
144 CONF_IPV4: validate[CONF_IPV4],
145 CONF_IPV6: validate[CONF_IPV6]
or validate[CONF_IPV6_V4],
148 CONF_RESOLVER: resolver,
150 CONF_RESOLVER_IPV6: set_resolver,
151 CONF_PORT_IPV6: port_ipv6,
158 data_schema=DATA_SCHEMA_ADV,
163 data_schema=DATA_SCHEMA,
169 """Handle a option config flow for dnsip integration."""
172 self, user_input: dict[str, Any] |
None =
None
173 ) -> ConfigFlowResult:
174 """Manage the options."""
176 if user_input
is not None:
177 resolver = user_input.get(CONF_RESOLVER, DEFAULT_RESOLVER)
178 port = user_input.get(CONF_PORT, DEFAULT_PORT)
179 resolver_ipv6 = user_input.get(CONF_RESOLVER_IPV6, DEFAULT_RESOLVER_IPV6)
180 port_ipv6 = user_input.get(CONF_PORT_IPV6, DEFAULT_PORT)
190 validate[CONF_IPV4]
is False
193 errors[CONF_RESOLVER] =
"invalid_resolver"
195 validate[CONF_IPV6]
is False
198 errors[CONF_RESOLVER_IPV6] =
"invalid_resolver"
203 CONF_RESOLVER: resolver,
205 CONF_RESOLVER_IPV6: resolver_ipv6,
206 CONF_PORT_IPV6: port_ipv6,
213 vol.Optional(CONF_RESOLVER): cv.string,
214 vol.Optional(CONF_PORT): cv.port,
215 vol.Optional(CONF_RESOLVER_IPV6): cv.string,
216 vol.Optional(CONF_PORT_IPV6): cv.port,
222 return self.
async_show_formasync_show_form(step_id=
"init", data_schema=schema, errors=errors)
DnsIPOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
bool show_advanced_options(self)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, bool] async_validate_hostname(str hostname, str resolver_ipv4, str resolver_ipv6, int port, int port_ipv6)