1 """Config flow for azure_event_hub integration."""
3 from __future__
import annotations
5 from copy
import deepcopy
9 from azure.eventhub.exceptions
import EventHubError
10 import voluptuous
as vol
16 SchemaOptionsFlowHandler,
19 from .client
import AzureEventHubClient
21 CONF_EVENT_HUB_CON_STRING,
22 CONF_EVENT_HUB_INSTANCE_NAME,
23 CONF_EVENT_HUB_NAMESPACE,
24 CONF_EVENT_HUB_SAS_KEY,
25 CONF_EVENT_HUB_SAS_POLICY,
36 _LOGGER = logging.getLogger(__name__)
38 BASE_SCHEMA = vol.Schema(
40 vol.Required(CONF_EVENT_HUB_INSTANCE_NAME): str,
41 vol.Optional(CONF_USE_CONN_STRING, default=
False): bool,
45 CONN_STRING_SCHEMA = vol.Schema(
47 vol.Required(CONF_EVENT_HUB_CON_STRING): str,
51 SAS_SCHEMA = vol.Schema(
53 vol.Required(CONF_EVENT_HUB_NAMESPACE): str,
54 vol.Required(CONF_EVENT_HUB_SAS_POLICY): str,
55 vol.Required(CONF_EVENT_HUB_SAS_KEY): str,
59 OPTIONS_SCHEMA = vol.Schema(
61 vol.Required(CONF_SEND_INTERVAL): int,
70 """Validate the input."""
71 client = AzureEventHubClient.from_input(**data)
73 await client.test_connection()
75 return {
"base":
"cannot_connect"}
77 _LOGGER.exception(
"Unknown error")
78 return {
"base":
"unknown"}
83 """Handle a config flow for azure event hub."""
88 """Initialize the config flow."""
89 self.
_data_data: dict[str, Any] = {}
90 self._options: dict[str, Any] = deepcopy(DEFAULT_OPTIONS)
96 config_entry: ConfigEntry,
97 ) -> SchemaOptionsFlowHandler:
98 """Get the options flow for this handler."""
102 self, user_input: dict[str, Any] |
None =
None
103 ) -> ConfigFlowResult:
104 """Handle the initial user step."""
105 if user_input
is None:
116 self, user_input: dict[str, Any] |
None =
None
117 ) -> ConfigFlowResult:
118 """Handle the connection string steps."""
120 if user_input
is None or errors
is not None:
122 step_id=STEP_CONN_STRING,
123 data_schema=CONN_STRING_SCHEMA,
125 description_placeholders={
126 "event_hub_instance_name": self.
_data_data[CONF_EVENT_HUB_INSTANCE_NAME]
132 title=self.
_data_data[CONF_EVENT_HUB_INSTANCE_NAME],
133 data=self.
_data_data,
134 options=self._options,
138 self, user_input: dict[str, Any] |
None =
None
139 ) -> ConfigFlowResult:
140 """Handle the sas steps."""
142 if user_input
is None or errors
is not None:
145 data_schema=SAS_SCHEMA,
147 description_placeholders={
148 "event_hub_instance_name": self.
_data_data[CONF_EVENT_HUB_INSTANCE_NAME]
154 title=self.
_data_data[CONF_EVENT_HUB_INSTANCE_NAME],
155 data=self.
_data_data,
156 options=self._options,
160 """Import config from configuration.yaml."""
161 if CONF_SEND_INTERVAL
in import_data:
162 self._options[CONF_SEND_INTERVAL] = import_data.pop(CONF_SEND_INTERVAL)
163 if CONF_MAX_DELAY
in import_data:
164 self._options[CONF_MAX_DELAY] = import_data.pop(CONF_MAX_DELAY)
165 self.
_data_data = import_data
170 title=self.
_data_data[CONF_EVENT_HUB_INSTANCE_NAME],
171 data=self.
_data_data,
172 options=self._options,
176 self, user_input: dict[str, Any] |
None
177 ) -> dict[str, str] |
None:
178 """Validate the input."""
179 if user_input
is None:
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
dict[str, str]|None async_update_and_validate_data(self, dict[str, Any]|None user_input)
ConfigFlowResult async_step_conn_string(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_sas(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
SchemaOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
dict[str, str]|None validate_data(dict[str, Any] data)
IssData update(pyiss.ISS iss)