1 """Config flow for HVV integration."""
3 from __future__
import annotations
8 from pygti.auth
import GTI_DEFAULT_HOST
9 from pygti.exceptions
import CannotConnect, InvalidAuth
10 import voluptuous
as vol
23 from .const
import CONF_FILTER, CONF_REAL_TIME, CONF_STATION, DOMAIN
24 from .hub
import GTIHub
26 _LOGGER = logging.getLogger(__name__)
28 SCHEMA_STEP_USER = vol.Schema(
30 vol.Required(CONF_HOST, default=GTI_DEFAULT_HOST): str,
31 vol.Required(CONF_USERNAME): str,
32 vol.Required(CONF_PASSWORD): str,
36 SCHEMA_STEP_STATION = vol.Schema({vol.Required(CONF_STATION): str})
38 SCHEMA_STEP_OPTIONS = vol.Schema(
40 vol.Required(CONF_FILTER): vol.In([]),
41 vol.Required(CONF_OFFSET, default=0): cv.positive_int,
42 vol.Optional(CONF_REAL_TIME, default=
True): bool,
48 """Handle a config flow for HVV."""
56 """Initialize component."""
57 self.
stationsstations: dict[str, Any] = {}
60 self, user_input: dict[str, Any] |
None =
None
61 ) -> ConfigFlowResult:
62 """Handle the initial step."""
65 if user_input
is not None:
66 session = aiohttp_client.async_get_clientsession(self.hass)
68 user_input[CONF_HOST],
69 user_input[CONF_USERNAME],
70 user_input[CONF_PASSWORD],
76 _LOGGER.debug(
"Init gti: %r", response)
78 errors[
"base"] =
"cannot_connect"
80 errors[
"base"] =
"invalid_auth"
87 step_id=
"user", data_schema=SCHEMA_STEP_USER, errors=errors
91 self, user_input: dict[str, Any] |
None =
None
92 ) -> ConfigFlowResult:
93 """Handle the step where the user inputs his/her station."""
94 if user_input
is not None:
97 check_name = await self.
hubhub.gti.checkName(
98 {
"theName": {
"name": user_input[CONF_STATION]},
"maxList": 20}
101 stations = check_name.get(
"results")
104 f
"{station.get('name')}": station
105 for station
in stations
106 if station.get(
"type") ==
"STATION"
110 errors[
"base"] =
"no_results"
113 step_id=
"station", data_schema=SCHEMA_STEP_STATION, errors=errors
123 self, user_input: dict[str, Any] |
None =
None
124 ) -> ConfigFlowResult:
125 """Handle the step where the user inputs his/her station."""
127 schema = vol.Schema({vol.Required(CONF_STATION): vol.In(
list(self.
stationsstations))})
129 if user_input
is None:
134 title = self.
datadata[CONF_STATION][
"name"]
141 config_entry: ConfigEntry,
142 ) -> OptionsFlowHandler:
143 """Get options flow."""
148 """Options flow handler."""
151 """Initialize HVV Departures options flow."""
155 self, user_input: dict[str, Any] |
None =
None
156 ) -> ConfigFlowResult:
157 """Manage the options."""
164 departure_list = await hub.gti.departureList(
170 "time": {
"date":
"heute",
"time":
"jetzt"},
172 "maxTimeOffset": 200,
174 "returnFilters":
True,
177 except CannotConnect:
178 errors[
"base"] =
"cannot_connect"
180 errors[
"base"] =
"invalid_auth"
184 str(i): departure_filter
185 for i, departure_filter
in enumerate(departure_list[
"filter"])
188 if user_input
is not None and not errors:
193 CONF_OFFSET: user_input[CONF_OFFSET],
194 CONF_REAL_TIME: user_input[CONF_REAL_TIME],
210 data_schema=vol.Schema(
212 vol.Optional(CONF_FILTER, default=old_filter): cv.multi_select(
215 f
"{departure_filter['serviceName']},"
216 f
" {departure_filter['label']}"
OptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_station(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_station_select(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
web.Response get(self, web.Request request, str config_key)
IssData update(pyiss.ISS iss)
dict[str, str|bool] authenticate(HomeAssistant hass, str host, str security_code)