1 """Config flow to configure the Meteo-France integration."""
3 from __future__
import annotations
8 from meteofrance_api.client
import MeteoFranceClient
9 from meteofrance_api.model
import Place
10 import voluptuous
as vol
16 from .const
import CONF_CITY, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 """Handle a Meteo-France config flow."""
27 """Init MeteoFranceFlowHandler."""
28 self.
placesplaces: list[Place] = []
33 user_input: dict[str, Any] |
None =
None,
34 errors: dict[str, str] |
None =
None,
35 ) -> ConfigFlowResult:
36 """Show the setup form to the user."""
38 if user_input
is None:
43 data_schema=vol.Schema(
44 {vol.Required(CONF_CITY, default=user_input.get(CONF_CITY,
"")): str}
50 self, user_input: dict[str, Any] |
None =
None
51 ) -> ConfigFlowResult:
52 """Handle a flow initiated by the user."""
53 errors: dict[str, str] = {}
55 if user_input
is None:
58 city = user_input[CONF_CITY]
59 latitude = user_input.get(CONF_LATITUDE)
60 longitude = user_input.get(CONF_LONGITUDE)
63 client = MeteoFranceClient()
64 self.
placesplaces = await self.hass.async_add_executor_job(
65 client.search_places, city
67 _LOGGER.debug(
"Places search result: %s", self.
placesplaces)
69 errors[CONF_CITY] =
"empty"
80 data={CONF_LATITUDE: latitude, CONF_LONGITUDE: longitude},
84 self, user_input: dict[str, Any] |
None =
None
85 ) -> ConfigFlowResult:
86 """Step where the user choose the city from the API search results."""
89 places_for_form: dict[str, str] = {}
90 for place
in self.
placesplaces:
95 data_schema=vol.Schema(
97 vol.Required(CONF_CITY): vol.All(
98 vol.Coerce(str), vol.In(places_for_form)
105 city_infos = user_input[CONF_CITY].split(
";")
108 CONF_CITY: city_infos[0],
109 CONF_LATITUDE: city_infos[1],
110 CONF_LONGITUDE: city_infos[2],
116 return f
"{place};{place.latitude};{place.longitude}"
ConfigFlowResult async_step_cities(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _show_setup_form(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
str _build_place_key(Place place)