1 """Config flow for Elvia integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
6 from typing
import TYPE_CHECKING, Any
8 from elvia
import Elvia, error
as ElviaError
9 import voluptuous
as vol
15 from .const
import CONF_METERING_POINT_ID, DOMAIN, LOGGER
19 """Handle a config flow for Elvia."""
28 user_input: dict[str, Any] |
None =
None,
29 ) -> ConfigFlowResult:
30 """Handle the initial step."""
31 errors: dict[str, str] = {}
32 if user_input
is not None:
33 self.
_api_token_api_token = api_token = user_input[CONF_API_TOKEN]
34 client = Elvia(meter_value_token=api_token).meter_value()
36 end_time = dt_util.utcnow()
37 results = await client.get_meter_values(
38 start_time=(end_time -
timedelta(hours=1)).isoformat(),
39 end_time=end_time.isoformat(),
42 except ElviaError.AuthError
as exception:
43 LOGGER.error(
"Authentication error %s", exception)
44 errors[
"base"] =
"invalid_auth"
45 except ElviaError.ElviaException
as exception:
46 LOGGER.error(
"Unknown error %s", exception)
47 errors[
"base"] =
"unknown"
51 x[
"meteringPointId"]
for x
in results[
"meteringpoints"]
56 if (meter_count := len(metering_point_ids)) > 1:
61 metering_point_id=metering_point_ids[0],
68 data_schema=vol.Schema(
70 vol.Required(CONF_API_TOKEN): str,
77 self, user_input: dict[str, Any] |
None =
None
78 ) -> ConfigFlowResult:
79 """Handle selecting a metering point ID."""
84 if user_input
is not None:
87 metering_point_id=user_input[CONF_METERING_POINT_ID],
91 step_id=
"select_meter",
92 data_schema=vol.Schema(
95 CONF_METERING_POINT_ID,
105 metering_point_id: str,
106 ) -> ConfigFlowResult:
107 """Store metering point ID and API token."""
110 reason=
"metering_point_id_already_configured",
111 description_placeholders={
"metering_point_id": metering_point_id},
114 title=metering_point_id,
116 CONF_API_TOKEN: api_token,
117 CONF_METERING_POINT_ID: metering_point_id,
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _create_config_entry(self, str api_token, str metering_point_id)
ConfigFlowResult async_step_select_meter(self, dict[str, Any]|None user_input=None)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)