1 """Config flow for Sun WEG integration."""
3 from collections.abc
import Mapping
6 from sunweg.api
import APIHelper, SunWegApiError
7 import voluptuous
as vol
13 from .const
import CONF_PLANT_ID, DOMAIN
17 """Config flow class."""
22 """Initialise sun weg server flow."""
23 self.
apiapi: APIHelper =
None
24 self.
datadata: dict[str, Any] = {}
28 """Show the form to the user."""
30 if CONF_USERNAME
in self.
datadata:
31 default_username = self.
datadata[CONF_USERNAME]
32 data_schema = vol.Schema(
34 vol.Required(CONF_USERNAME, default=default_username): str,
35 vol.Required(CONF_PASSWORD): str,
40 step_id=step_id, data_schema=data_schema, errors=errors
44 self, step: str, username: str, password: str
45 ) -> ConfigFlowResult |
None:
46 """Set username and password."""
49 self.
apiapi.username = username
50 self.
apiapi.password = password
53 self.
apiapi = APIHelper(username, password)
58 except SunWegApiError:
64 """Handle the start of the config flow."""
71 conf_result = await self.hass.async_add_executor_job(
74 user_input[CONF_USERNAME],
75 user_input[CONF_PASSWORD],
78 return await self.
async_step_plantasync_step_plant()
if conf_result
is None else conf_result
81 """Handle adding a "plant" to Home Assistant."""
82 plant_list = await self.hass.async_add_executor_job(self.
apiapi.listPlants)
84 if len(plant_list) == 0:
87 plants = {plant.id: plant.name
for plant
in plant_list}
89 if user_input
is None and len(plant_list) > 1:
90 data_schema = vol.Schema({vol.Required(CONF_PLANT_ID): vol.In(plants)})
94 if user_input
is None and len(plant_list) == 1:
95 user_input = {CONF_PLANT_ID: plant_list[0].id}
97 user_input[CONF_NAME] = plants[user_input[CONF_PLANT_ID]]
104 self, entry_data: Mapping[str, Any]
105 ) -> ConfigFlowResult:
106 """Handle reauthorization request from SunWEG."""
111 self, user_input: dict[str, Any] |
None =
None
112 ) -> ConfigFlowResult:
113 """Handle reauthorization flow."""
114 if user_input
is None:
118 conf_result = await self.hass.async_add_executor_job(
121 user_input[CONF_USERNAME],
122 user_input[CONF_PASSWORD],
124 if conf_result
is not None:
ConfigFlowResult async_step_user(self, user_input=None)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
ConfigFlowResult _async_show_user_form(self, str step_id, errors=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult|None _set_auth_data(self, str step, str username, str password)
ConfigFlowResult async_step_plant(self, user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry _get_reauth_entry(self)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
IssData update(pyiss.ISS iss)
dict[str, str|bool] authenticate(HomeAssistant hass, str host, str security_code)