Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Rollease Acmeda Automate Pulse Hub."""
2 
3 from __future__ import annotations
4 
5 from asyncio import timeout
6 from contextlib import suppress
7 from typing import Any
8 
9 import aiopulse
10 import voluptuous as vol
11 
12 from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
13 from homeassistant.const import CONF_HOST, CONF_ID
14 
15 from .const import DOMAIN
16 
17 
18 class AcmedaFlowHandler(ConfigFlow, domain=DOMAIN):
19  """Handle a Acmeda config flow."""
20 
21  VERSION = 1
22 
23  def __init__(self) -> None:
24  """Initialize the config flow."""
25  self.discovered_hubsdiscovered_hubs: dict[str, aiopulse.Hub] | None = None
26 
27  async def async_step_user(
28  self, user_input: dict[str, Any] | None = None
29  ) -> ConfigFlowResult:
30  """Handle a flow initialized by the user."""
31  if (
32  user_input is not None
33  and self.discovered_hubsdiscovered_hubs is not None
34  and user_input[CONF_ID] in self.discovered_hubsdiscovered_hubs
35  ):
36  return await self.async_createasync_create(self.discovered_hubsdiscovered_hubs[user_input[CONF_ID]])
37 
38  # Already configured hosts
39  already_configured = {
40  entry.unique_id for entry in self._async_current_entries_async_current_entries()
41  }
42 
43  with suppress(TimeoutError):
44  async with timeout(5):
45  hubs: list[aiopulse.Hub] = [
46  hub
47  async for hub in aiopulse.Hub.discover()
48  if hub.id not in already_configured
49  ]
50 
51  if not hubs:
52  return self.async_abortasync_abortasync_abort(reason="no_devices_found")
53 
54  if len(hubs) == 1:
55  return await self.async_createasync_create(hubs[0])
56 
57  self.discovered_hubsdiscovered_hubs = {hub.id: hub for hub in hubs}
58 
59  return self.async_show_formasync_show_formasync_show_form(
60  step_id="user",
61  data_schema=vol.Schema(
62  {
63  vol.Required(CONF_ID): vol.In(
64  {hub.id: f"{hub.id} {hub.host}" for hub in hubs}
65  )
66  }
67  ),
68  )
69 
70  async def async_create(self, hub: aiopulse.Hub) -> ConfigFlowResult:
71  """Create the Acmeda Hub entry."""
72  await self.async_set_unique_idasync_set_unique_id(hub.id, raise_on_progress=False)
73  return self.async_create_entryasync_create_entryasync_create_entry(title=hub.id, data={CONF_HOST: hub.host})
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:29
ConfigFlowResult async_create(self, aiopulse.Hub hub)
Definition: config_flow.py:70
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)