Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for OpenSky integration."""
2 
3 from __future__ import annotations
4 
5 from typing import Any
6 
7 from aiohttp import BasicAuth
8 from python_opensky import OpenSky
9 from python_opensky.exceptions import OpenSkyUnauthenticatedError
10 import voluptuous as vol
11 
12 from homeassistant.config_entries import (
13  ConfigEntry,
14  ConfigFlow,
15  ConfigFlowResult,
16  OptionsFlow,
17 )
18 from homeassistant.const import (
19  CONF_LATITUDE,
20  CONF_LONGITUDE,
21  CONF_PASSWORD,
22  CONF_RADIUS,
23  CONF_USERNAME,
24 )
25 from homeassistant.core import callback
26 from homeassistant.helpers.aiohttp_client import async_get_clientsession
28 
29 from .const import (
30  CONF_ALTITUDE,
31  CONF_CONTRIBUTING_USER,
32  DEFAULT_ALTITUDE,
33  DEFAULT_NAME,
34  DOMAIN,
35 )
36 
37 
38 class OpenSkyConfigFlowHandler(ConfigFlow, domain=DOMAIN):
39  """Config flow handler for OpenSky."""
40 
41  @staticmethod
42  @callback
44  config_entry: ConfigEntry,
45  ) -> OpenSkyOptionsFlowHandler:
46  """Get the options flow for this handler."""
48 
49  async def async_step_user(
50  self, user_input: dict[str, Any] | None = None
51  ) -> ConfigFlowResult:
52  """Initialize user input."""
53  if user_input is not None:
54  return self.async_create_entryasync_create_entryasync_create_entry(
55  title=DEFAULT_NAME,
56  data={
57  CONF_LATITUDE: user_input[CONF_LATITUDE],
58  CONF_LONGITUDE: user_input[CONF_LONGITUDE],
59  },
60  options={
61  CONF_RADIUS: user_input[CONF_RADIUS],
62  CONF_ALTITUDE: user_input[CONF_ALTITUDE],
63  },
64  )
65  return self.async_show_formasync_show_formasync_show_form(
66  step_id="user",
67  data_schema=self.add_suggested_values_to_schemaadd_suggested_values_to_schema(
68  vol.Schema(
69  {
70  vol.Required(CONF_RADIUS): vol.Coerce(float),
71  vol.Required(CONF_LATITUDE): cv.latitude,
72  vol.Required(CONF_LONGITUDE): cv.longitude,
73  vol.Optional(CONF_ALTITUDE): vol.Coerce(float),
74  }
75  ),
76  {
77  CONF_LATITUDE: self.hass.config.latitude,
78  CONF_LONGITUDE: self.hass.config.longitude,
79  CONF_ALTITUDE: DEFAULT_ALTITUDE,
80  },
81  ),
82  )
83 
84 
86  """OpenSky Options flow handler."""
87 
88  async def async_step_init(
89  self, user_input: dict[str, Any] | None = None
90  ) -> ConfigFlowResult:
91  """Initialize form."""
92  errors: dict[str, str] = {}
93  if user_input is not None:
94  authentication = CONF_USERNAME in user_input or CONF_PASSWORD in user_input
95  if authentication and CONF_USERNAME not in user_input:
96  errors["base"] = "username_missing"
97  if authentication and CONF_PASSWORD not in user_input:
98  errors["base"] = "password_missing"
99  if user_input[CONF_CONTRIBUTING_USER] and not authentication:
100  errors["base"] = "no_authentication"
101  if authentication and not errors:
102  opensky = OpenSky(session=async_get_clientsession(self.hass))
103  try:
104  await opensky.authenticate(
105  BasicAuth(
106  login=user_input[CONF_USERNAME],
107  password=user_input[CONF_PASSWORD],
108  ),
109  contributing_user=user_input[CONF_CONTRIBUTING_USER],
110  )
111  except OpenSkyUnauthenticatedError:
112  errors["base"] = "invalid_auth"
113  if not errors:
114  return self.async_create_entryasync_create_entry(data=user_input)
115 
116  return self.async_show_formasync_show_form(
117  step_id="init",
118  errors=errors,
119  data_schema=self.add_suggested_values_to_schemaadd_suggested_values_to_schema(
120  vol.Schema(
121  {
122  vol.Required(CONF_RADIUS): vol.Coerce(float),
123  vol.Optional(CONF_ALTITUDE): vol.Coerce(float),
124  vol.Optional(CONF_USERNAME): str,
125  vol.Optional(CONF_PASSWORD): str,
126  vol.Optional(CONF_CONTRIBUTING_USER, default=False): bool,
127  }
128  ),
129  user_input or self.config_entryconfig_entryconfig_entry.options,
130  ),
131  )
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:51
OpenSkyOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:45
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:90
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)