1 """Config flow for Rollease Acmeda Automate Pulse Hub."""
3 from __future__
import annotations
5 from asyncio
import timeout
6 from contextlib
import suppress
10 import voluptuous
as vol
15 from .const
import DOMAIN
19 """Handle a Acmeda config flow."""
24 """Initialize the config flow."""
25 self.
discovered_hubsdiscovered_hubs: dict[str, aiopulse.Hub] |
None =
None
28 self, user_input: dict[str, Any] |
None =
None
29 ) -> ConfigFlowResult:
30 """Handle a flow initialized by the user."""
32 user_input
is not None
39 already_configured = {
43 with suppress(TimeoutError):
44 async
with timeout(5):
45 hubs: list[aiopulse.Hub] = [
47 async
for hub
in aiopulse.Hub.discover()
48 if hub.id
not in already_configured
61 data_schema=vol.Schema(
63 vol.Required(CONF_ID): vol.In(
64 {hub.id: f
"{hub.id} {hub.host}" for hub
in hubs}
70 async
def async_create(self, hub: aiopulse.Hub) -> ConfigFlowResult:
71 """Create the Acmeda Hub entry."""
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create(self, aiopulse.Hub hub)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)