1 """Config flow for the dwd_weather_warnings integration."""
3 from __future__
import annotations
7 from dwdwfsapi
import DwdWeatherWarningsAPI
8 import voluptuous
as vol
15 from .const
import CONF_REGION_DEVICE_TRACKER, CONF_REGION_IDENTIFIER, DOMAIN
16 from .exceptions
import EntityNotFoundError
17 from .util
import get_position_data
19 EXCLUSIVE_OPTIONS = (CONF_REGION_IDENTIFIER, CONF_REGION_DEVICE_TRACKER)
23 """Handle the config flow for the dwd_weather_warnings integration."""
28 self, user_input: dict[str, Any] |
None =
None
29 ) -> ConfigFlowResult:
30 """Handle the initial step."""
33 if user_input
is not None:
35 if all(k
not in user_input
for k
in EXCLUSIVE_OPTIONS):
36 errors[
"base"] =
"no_identifier"
37 elif all(k
in user_input
for k
in EXCLUSIVE_OPTIONS):
38 errors[
"base"] =
"ambiguous_identifier"
39 elif CONF_REGION_IDENTIFIER
in user_input:
41 identifier = user_input[CONF_REGION_IDENTIFIER]
43 if not await self.hass.async_add_executor_job(
44 DwdWeatherWarningsAPI, identifier
46 errors[
"base"] =
"invalid_identifier"
55 device_tracker = user_input[CONF_REGION_DEVICE_TRACKER]
56 registry = er.async_get(self.hass)
57 entity_entry = registry.async_get(device_tracker)
59 if entity_entry
is None:
60 errors[
"base"] =
"entity_not_found"
64 except EntityNotFoundError:
65 errors[
"base"] =
"entity_not_found"
66 except AttributeError:
67 errors[
"base"] =
"attribute_not_found"
70 if not await self.hass.async_add_executor_job(
71 DwdWeatherWarningsAPI, position
73 errors[
"base"] =
"invalid_identifier"
76 if not errors
and position
is not None and entity_entry
is not None:
82 user_input[CONF_REGION_DEVICE_TRACKER] = entity_entry.id
85 title=device_tracker.removeprefix(
"device_tracker."),
92 data_schema=vol.Schema(
94 vol.Optional(CONF_REGION_IDENTIFIER): cv.string,
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
tuple[float, float]|None get_position_data(HomeAssistant hass, str registry_id)