1 """Config flow for Ukraine Alarm."""
3 from __future__
import annotations
6 from typing
import TYPE_CHECKING, Any
9 from uasiren.client
import Client
10 import voluptuous
as vol
16 from .const
import DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 """Config flow for Ukraine Alarm."""
27 """Initialize a new UkraineAlarmConfigFlow."""
28 self.
statesstates: list[dict[str, Any]] |
None =
None
32 self, user_input: dict[str, Any] |
None =
None
33 ) -> ConfigFlowResult:
34 """Handle a flow initialized by the user."""
42 unknown_err_msg =
None
44 regions = await Client(websession).get_regions()
45 except aiohttp.ClientResponseError
as ex:
50 unknown_err_msg =
str(ex)
51 except aiohttp.ClientConnectionError:
52 reason =
"cannot_connect"
53 except aiohttp.ClientError
as ex:
55 unknown_err_msg =
str(ex)
59 if not reason
and not regions:
61 unknown_err_msg =
"no regions returned"
64 _LOGGER.error(
"Failed to connect to the service: %s", unknown_err_msg)
68 self.
statesstates = regions[
"states"]
73 self, user_input: dict[str, str] |
None =
None
74 ) -> ConfigFlowResult:
75 """Handle user-chosen district."""
76 return await self.
_handle_pick_region_handle_pick_region(
"district",
"community", user_input)
79 self, user_input: dict[str, str] |
None =
None
80 ) -> ConfigFlowResult:
81 """Handle user-chosen community."""
82 return await self.
_handle_pick_region_handle_pick_region(
"community",
None, user_input,
True)
87 next_step: str |
None,
88 user_input: dict[str, str] |
None,
89 last_step: bool =
False,
90 ) -> ConfigFlowResult:
91 """Handle picking a (sub)region."""
97 if user_input
is not None:
101 or user_input[CONF_REGION] != self.
selected_regionselected_region[
"regionId"]
110 return await getattr(self, f
"async_step_{next_step}")()
124 vol.Required(CONF_REGION): vol.In(regions),
129 step_id=step_id, data_schema=schema, last_step=last_step
133 """Finish the setup."""
148 def _find(regions: list[dict[str, Any]], region_id):
149 return next((region
for region
in regions
if region[
"regionId"] == region_id),
None)
153 regions = sorted(regions, key=
lambda region: region[
"regionName"].lower())
154 return {region[
"regionId"]: region[
"regionName"]
for region
in regions}
ConfigFlowResult _handle_pick_region(self, str step_id, str|None next_step, dict[str, str]|None user_input, bool last_step=False)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_community(self, dict[str, str]|None user_input=None)
ConfigFlowResult async_step_district(self, dict[str, str]|None user_input=None)
ConfigFlowResult _async_finish_flow(self)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
def _find(list[dict[str, Any]] regions, region_id)
dict[str, str] _make_regions_object(list[dict[str, Any]] regions)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)