1 """Config flow for WeatherflowCloud integration."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 from aiohttp
import ClientResponseError
9 import voluptuous
as vol
10 from weatherflow4py.api
import WeatherFlowRestAPI
15 from .const
import DOMAIN
19 """Validate the API token."""
21 async
with WeatherFlowRestAPI(api_token)
as api:
22 await api.async_get_stations()
23 except ClientResponseError
as err:
25 return {
"base":
"invalid_api_key"}
26 return {
"base":
"cannot_connect"}
31 """Handle a config flow for WeatherFlowCloud."""
36 self, entry_data: Mapping[str, Any]
37 ) -> ConfigFlowResult:
38 """Handle a flow for reauth."""
42 self, user_input: dict[str, Any] |
None =
None
43 ) -> ConfigFlowResult:
44 """Handle a flow initiated by reauthentication."""
47 if user_input
is not None:
48 api_token = user_input[CONF_API_TOKEN]
54 data={CONF_API_TOKEN: api_token},
55 reload_even_if_entry_is_unchanged=
False,
59 step_id=
"reauth_confirm",
60 data_schema=vol.Schema({vol.Required(CONF_API_TOKEN): str}),
65 self, user_input: dict[str, Any] |
None =
None
66 ) -> ConfigFlowResult:
67 """Handle a flow initialized by the user."""
70 if user_input
is not None:
72 api_token = user_input[CONF_API_TOKEN]
76 title=
"Weatherflow REST",
77 data={CONF_API_TOKEN: api_token},
82 data_schema=vol.Schema({vol.Required(CONF_API_TOKEN): str}),
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
ConfigEntry _get_reauth_entry(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, Any] _validate_api_token(str api_token)