1 """Config flow for Kostal Plenticore Solar Inverter integration."""
6 from aiohttp.client_exceptions
import ClientError
7 from pykoplenti
import ApiClient, AuthenticationException
8 import voluptuous
as vol
15 from .const
import DOMAIN
16 from .helper
import get_hostname_id
18 _LOGGER = logging.getLogger(__name__)
20 DATA_SCHEMA = vol.Schema(
22 vol.Required(CONF_HOST): str,
23 vol.Required(CONF_PASSWORD): str,
29 """Test the connection to the inverter.
31 Data has the keys from DATA_SCHEMA with values provided by the user.
35 async
with ApiClient(session, data[
"host"])
as client:
36 await client.login(data[
"password"])
38 values = await client.get_setting_values(
"scb:network", hostname_id)
40 return values[
"scb:network"][hostname_id]
44 """Handle a config flow for Kostal Plenticore Solar Inverter."""
49 self, user_input: dict[str, Any] |
None =
None
50 ) -> ConfigFlowResult:
51 """Handle the initial step."""
54 if user_input
is not None:
59 except AuthenticationException
as ex:
60 errors[CONF_PASSWORD] =
"invalid_auth"
61 _LOGGER.error(
"Error response: %s", ex)
62 except (ClientError, TimeoutError):
63 errors[CONF_HOST] =
"cannot_connect"
65 _LOGGER.exception(
"Unexpected exception")
66 errors[CONF_BASE] =
"unknown"
71 step_id=
"user", data_schema=DATA_SCHEMA, errors=errors
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
str test_connection(HomeAssistant hass, data)
str get_hostname_id(ApiClient client)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)