Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Soma."""
2 
3 import logging
4 from typing import Any
5 
6 from api.soma_api import SomaApi
7 from requests import RequestException
8 import voluptuous as vol
9 
10 from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
11 from homeassistant.const import CONF_HOST, CONF_PORT
12 
13 from .const import DOMAIN
14 
15 _LOGGER = logging.getLogger(__name__)
16 
17 DEFAULT_PORT = 3000
18 
19 
20 class SomaFlowHandler(ConfigFlow, domain=DOMAIN):
21  """Handle a config flow."""
22 
23  VERSION = 1
24 
25  def __init__(self) -> None:
26  """Instantiate config flow."""
27 
28  async def async_step_user(
29  self, user_input: dict[str, Any] | None = None
30  ) -> ConfigFlowResult:
31  """Handle a flow start."""
32  if user_input is None:
33  data = {
34  vol.Required(CONF_HOST): str,
35  vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
36  }
37 
38  return self.async_show_form(step_id="user", data_schema=vol.Schema(data))
39 
40  return await self.async_step_creation(user_input)
41 
42  async def async_step_creation(self, user_input: dict[str, Any]) -> ConfigFlowResult:
43  """Finish config flow."""
44  try:
45  api = await self.hass.async_add_executor_job(
46  SomaApi, user_input["host"], user_input["port"]
47  )
48  except RequestException:
49  _LOGGER.error("Connection to SOMA Connect failed with RequestException")
50  return self.async_abortasync_abortasync_abort(reason="connection_error")
51  try:
52  result = await self.hass.async_add_executor_job(api.list_devices)
53  _LOGGER.debug("Successfully set up Soma Connect")
54  if result["result"] == "success":
55  return self.async_create_entryasync_create_entryasync_create_entry(
56  title="Soma Connect",
57  data={"host": user_input["host"], "port": user_input["port"]},
58  )
59  _LOGGER.error(
60  "Connection to SOMA Connect failed (result:%s)", result["result"]
61  )
62  return self.async_abortasync_abortasync_abort(reason="result_error")
63  except RequestException:
64  _LOGGER.error("Connection to SOMA Connect failed with RequestException")
65  return self.async_abortasync_abortasync_abort(reason="connection_error")
66  except KeyError:
67  _LOGGER.error("Connection to SOMA Connect failed with KeyError")
68  return self.async_abortasync_abortasync_abort(reason="connection_error")
69 
70  async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
71  """Handle flow start from existing config section."""
72  if self._async_current_entries_async_current_entries():
73  return self.async_abortasync_abortasync_abort(reason="already_setup")
74  return await self.async_step_creationasync_step_creation(import_data)
ConfigFlowResult async_step_creation(self, dict[str, Any] user_input)
Definition: config_flow.py:42
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
Definition: config_flow.py:70
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:30
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)