1 """Config flow for OpenSky integration."""
3 from __future__
import annotations
7 from aiohttp
import BasicAuth
8 from python_opensky
import OpenSky
9 from python_opensky.exceptions
import OpenSkyUnauthenticatedError
10 import voluptuous
as vol
31 CONF_CONTRIBUTING_USER,
39 """Config flow handler for OpenSky."""
44 config_entry: ConfigEntry,
45 ) -> OpenSkyOptionsFlowHandler:
46 """Get the options flow for this handler."""
50 self, user_input: dict[str, Any] |
None =
None
51 ) -> ConfigFlowResult:
52 """Initialize user input."""
53 if user_input
is not None:
57 CONF_LATITUDE: user_input[CONF_LATITUDE],
58 CONF_LONGITUDE: user_input[CONF_LONGITUDE],
61 CONF_RADIUS: user_input[CONF_RADIUS],
62 CONF_ALTITUDE: user_input[CONF_ALTITUDE],
70 vol.Required(CONF_RADIUS): vol.Coerce(float),
71 vol.Required(CONF_LATITUDE): cv.latitude,
72 vol.Required(CONF_LONGITUDE): cv.longitude,
73 vol.Optional(CONF_ALTITUDE): vol.Coerce(float),
77 CONF_LATITUDE: self.hass.config.latitude,
78 CONF_LONGITUDE: self.hass.config.longitude,
79 CONF_ALTITUDE: DEFAULT_ALTITUDE,
86 """OpenSky Options flow handler."""
89 self, user_input: dict[str, Any] |
None =
None
90 ) -> ConfigFlowResult:
91 """Initialize form."""
92 errors: dict[str, str] = {}
93 if user_input
is not None:
94 authentication = CONF_USERNAME
in user_input
or CONF_PASSWORD
in user_input
95 if authentication
and CONF_USERNAME
not in user_input:
96 errors[
"base"] =
"username_missing"
97 if authentication
and CONF_PASSWORD
not in user_input:
98 errors[
"base"] =
"password_missing"
99 if user_input[CONF_CONTRIBUTING_USER]
and not authentication:
100 errors[
"base"] =
"no_authentication"
101 if authentication
and not errors:
104 await opensky.authenticate(
106 login=user_input[CONF_USERNAME],
107 password=user_input[CONF_PASSWORD],
109 contributing_user=user_input[CONF_CONTRIBUTING_USER],
111 except OpenSkyUnauthenticatedError:
112 errors[
"base"] =
"invalid_auth"
122 vol.Required(CONF_RADIUS): vol.Coerce(float),
123 vol.Optional(CONF_ALTITUDE): vol.Coerce(float),
124 vol.Optional(CONF_USERNAME): str,
125 vol.Optional(CONF_PASSWORD): str,
126 vol.Optional(CONF_CONTRIBUTING_USER, default=
False): bool,
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
OpenSkyOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)