1 """Config flow for Co2signal integration."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 from aioelectricitymaps
import (
11 ElectricityMapsInvalidTokenError,
12 ElectricityMapsNoDataError,
14 import voluptuous
as vol
31 from .const
import DOMAIN
32 from .helpers
import fetch_latest_carbon_intensity
33 from .util
import get_extra_name
35 TYPE_USE_HOME =
"use_home_location"
36 TYPE_SPECIFY_COORDINATES =
"specify_coordinates"
37 TYPE_SPECIFY_COUNTRY =
"specify_country_code"
41 """Handle a config flow for Co2signal."""
47 self, user_input: dict[str, Any] |
None =
None
48 ) -> ConfigFlowResult:
49 """Handle the initial step."""
50 data_schema = vol.Schema(
54 translation_key=
"location",
55 mode=SelectSelectorMode.LIST,
58 TYPE_SPECIFY_COORDINATES,
63 vol.Required(CONF_API_KEY): cv.string,
67 if user_input
is None:
70 data_schema=data_schema,
73 data = {CONF_API_KEY: user_input[CONF_API_KEY]}
75 if user_input[
"location"] == TYPE_SPECIFY_COORDINATES:
79 if user_input[
"location"] == TYPE_SPECIFY_COUNTRY:
80 self.
_data_data = data
86 self, user_input: dict[str, Any] |
None =
None
87 ) -> ConfigFlowResult:
88 """Validate coordinates."""
89 data_schema = vol.Schema(
99 if user_input
is None:
102 assert self.
_data_data
is not None
105 "coordinates", data_schema, {**self.
_data_data, **user_input}
109 self, user_input: dict[str, Any] |
None =
None
110 ) -> ConfigFlowResult:
111 """Validate country."""
112 data_schema = vol.Schema(
114 vol.Required(CONF_COUNTRY_CODE): cv.string,
117 if user_input
is None:
120 assert self.
_data_data
is not None
123 "country", data_schema, {**self.
_data_data, **user_input}
127 self, entry_data: Mapping[str, Any]
128 ) -> ConfigFlowResult:
129 """Handle the reauth step."""
133 self, user_input: dict[str, Any] |
None =
None
134 ) -> ConfigFlowResult:
135 """Handle the reauth step."""
136 data_schema = vol.Schema(
138 vol.Required(CONF_API_KEY): cv.string,
142 "reauth_confirm", data_schema, user_input
146 self, step_id: str, data_schema: vol.Schema, data: Mapping[str, Any] |
None
147 ) -> ConfigFlowResult:
148 """Validate data and show form if it is invalid."""
149 errors: dict[str, str] = {}
153 em = ElectricityMaps(token=data[CONF_API_KEY], session=session)
157 except ElectricityMapsInvalidTokenError:
158 errors[
"base"] =
"invalid_auth"
159 except ElectricityMapsNoDataError:
160 errors[
"base"] =
"no_data"
161 except ElectricityMapsError:
162 errors[
"base"] =
"unknown"
167 data_updates={CONF_API_KEY: data[CONF_API_KEY]},
177 data_schema=data_schema,
ConfigFlowResult async_step_country(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _validate_and_create(self, str step_id, vol.Schema data_schema, Mapping[str, Any]|None data)
ConfigFlowResult async_step_coordinates(self, dict[str, Any]|None user_input=None)
ConfigEntry _get_reauth_entry(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
CarbonIntensityResponse fetch_latest_carbon_intensity(HomeAssistant hass, ElectricityMaps em, Mapping[str, Any] config)
str|None get_extra_name(Mapping[str, Any] config)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)