1 """Config flow for Wyoming integration."""
3 from __future__
import annotations
7 from urllib.parse
import urlparse
9 import voluptuous
as vol
16 from .const
import DOMAIN
17 from .data
import WyomingService
19 _LOGGER = logging.getLogger(__name__)
21 STEP_USER_DATA_SCHEMA = vol.Schema(
23 vol.Required(CONF_HOST): str,
24 vol.Required(CONF_PORT): int,
30 """Handle a config flow for Wyoming integration."""
34 _hassio_discovery: HassioServiceInfo
35 _service: WyomingService |
None =
None
36 _name: str |
None =
None
39 self, user_input: dict[str, Any] |
None =
None
40 ) -> ConfigFlowResult:
41 """Handle the initial step."""
42 if user_input
is None:
44 step_id=
"user", data_schema=STEP_USER_DATA_SCHEMA
47 service = await WyomingService.create(
48 user_input[CONF_HOST],
49 user_input[CONF_PORT],
55 data_schema=STEP_USER_DATA_SCHEMA,
56 errors={
"base":
"cannot_connect"},
59 if name := service.get_name():
65 self, discovery_info: HassioServiceInfo
66 ) -> ConfigFlowResult:
67 """Handle Supervisor add-on discovery."""
68 _LOGGER.debug(
"Supervisor discovery info: %s", discovery_info)
75 "title_placeholders": {
"name": discovery_info.name},
76 "configuration_url": f
"homeassistant://hassio/addon/{discovery_info.slug}/info",
79 return await self.async_step_hassio_confirm()
81 async
def async_step_hassio_confirm(
82 self, user_input: dict[str, Any] |
None =
None
83 ) -> ConfigFlowResult:
84 """Confirm Supervisor discovery."""
85 errors: dict[str, str] = {}
87 if user_input
is not None:
89 if service := await WyomingService.create(uri.hostname, uri.port):
90 if not service.has_services():
95 data={CONF_HOST: uri.hostname, CONF_PORT: uri.port},
98 errors = {
"base":
"cannot_connect"}
101 step_id=
"hassio_confirm",
102 description_placeholders={
"addon": self.
_hassio_discovery_hassio_discovery.name},
107 self, discovery_info: zeroconf.ZeroconfServiceInfo
108 ) -> ConfigFlowResult:
109 """Handle zeroconf discovery."""
110 _LOGGER.debug(
"Zeroconf discovery info: %s", discovery_info)
111 if discovery_info.port
is None:
114 service = await WyomingService.create(discovery_info.host, discovery_info.port)
115 if (service
is None)
or (
not (name := service.get_name())):
123 unique_id = f
"{discovery_info.name}_{self._name}"
127 self.context[
"title_placeholders"] = {
"name": self.
_name_name}
130 return await self.async_step_zeroconf_confirm()
132 async
def async_step_zeroconf_confirm(
133 self, user_input: dict[str, Any] |
None =
None
134 ) -> ConfigFlowResult:
135 """Handle a flow initiated by zeroconf."""
136 assert self.
_service_service
is not None
137 assert self.
_name_name
is not None
139 if user_input
is None:
141 step_id=
"zeroconf_confirm",
142 description_placeholders={
"name": self.
_name_name},
147 title=self.
_name_name,
149 CONF_HOST: self.
_service_service.host,
150 CONF_PORT: self.
_service_service.port,
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_hassio(self, HassioServiceInfo discovery_info)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_step_zeroconf(self, ZeroconfServiceInfo discovery_info)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
IssData update(pyiss.ISS iss)