1 """Config flow for Azure Data Explorer integration."""
3 from __future__
import annotations
8 from azure.kusto.data.exceptions
import KustoAuthenticationError, KustoServiceError
9 import voluptuous
as vol
11 from homeassistant
import config_entries
15 from .
import AzureDataExplorerClient
17 CONF_ADX_CLUSTER_INGEST_URI,
18 CONF_ADX_DATABASE_NAME,
23 CONF_USE_QUEUED_CLIENT,
28 _LOGGER = logging.getLogger(__name__)
30 STEP_USER_DATA_SCHEMA = vol.Schema(
32 vol.Required(CONF_ADX_CLUSTER_INGEST_URI): str,
33 vol.Required(CONF_ADX_DATABASE_NAME): str,
34 vol.Required(CONF_ADX_TABLE_NAME): str,
35 vol.Required(CONF_APP_REG_ID): str,
36 vol.Required(CONF_APP_REG_SECRET): str,
37 vol.Required(CONF_AUTHORITY_ID): str,
44 """Handle a config flow for Azure Data Explorer."""
48 async
def validate_input(self, data: dict[str, Any]) -> dict[str, Any] |
None:
49 """Validate the user input allows us to connect.
51 Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user.
53 client = AzureDataExplorerClient(data)
56 await self.hass.async_add_executor_job(client.test_connection)
58 except KustoAuthenticationError
as exp:
60 return {
"base":
"invalid_auth"}
62 except KustoServiceError
as exp:
64 return {
"base":
"cannot_connect"}
69 self, user_input: dict[str, Any] |
None =
None
70 ) -> ConfigFlowResult:
71 """Handle the initial step."""
79 title=user_input[CONF_ADX_CLUSTER_INGEST_URI].replace(
82 options=DEFAULT_OPTIONS,
86 data_schema=STEP_USER_DATA_SCHEMA,
dict[str, Any]|None validate_input(self, dict[str, Any] data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)