Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Omnilogic integration."""
2 
3 from __future__ import annotations
4 
5 import logging
6 from typing import Any
7 
8 from omnilogic import LoginException, OmniLogic, OmniLogicException
9 import voluptuous as vol
10 
11 from homeassistant.config_entries import (
12  ConfigEntry,
13  ConfigFlow,
14  ConfigFlowResult,
15  OptionsFlow,
16 )
17 from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
18 from homeassistant.core import callback
19 from homeassistant.helpers import aiohttp_client
20 
21 from .const import CONF_SCAN_INTERVAL, DEFAULT_PH_OFFSET, DEFAULT_SCAN_INTERVAL, DOMAIN
22 
23 _LOGGER = logging.getLogger(__name__)
24 
25 
26 class OmniLogicConfigFlow(ConfigFlow, domain=DOMAIN):
27  """Handle a config flow for Omnilogic."""
28 
29  VERSION = 1
30 
31  @staticmethod
32  @callback
34  config_entry: ConfigEntry,
35  ) -> OptionsFlowHandler:
36  """Get the options flow for this handler."""
37  return OptionsFlowHandler()
38 
39  async def async_step_user(
40  self, user_input: dict[str, Any] | None = None
41  ) -> ConfigFlowResult:
42  """Handle the initial step."""
43  errors: dict[str, str] = {}
44 
45  if user_input is not None:
46  username = user_input[CONF_USERNAME]
47  password = user_input[CONF_PASSWORD]
48 
49  session = aiohttp_client.async_get_clientsession(self.hass)
50  omni = OmniLogic(username, password, session)
51 
52  try:
53  await omni.connect()
54  except LoginException:
55  errors["base"] = "invalid_auth"
56  except OmniLogicException:
57  errors["base"] = "cannot_connect"
58  except Exception:
59  _LOGGER.exception("Unexpected exception")
60  errors["base"] = "unknown"
61  else:
62  await self.async_set_unique_idasync_set_unique_id(user_input["username"])
63  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
64  return self.async_create_entryasync_create_entryasync_create_entry(title="Omnilogic", data=user_input)
65 
66  return self.async_show_formasync_show_formasync_show_form(
67  step_id="user",
68  data_schema=vol.Schema(
69  {
70  vol.Required(CONF_USERNAME): str,
71  vol.Required(CONF_PASSWORD): str,
72  }
73  ),
74  errors=errors,
75  )
76 
77 
79  """Handle Omnilogic client options."""
80 
81  async def async_step_init(
82  self, user_input: dict[str, Any] | None = None
83  ) -> ConfigFlowResult:
84  """Manage options."""
85 
86  if user_input is not None:
87  return self.async_create_entryasync_create_entry(title="", data=user_input)
88 
89  return self.async_show_formasync_show_form(
90  step_id="init",
91  data_schema=vol.Schema(
92  {
93  vol.Optional(
94  CONF_SCAN_INTERVAL,
95  default=self.config_entryconfig_entryconfig_entry.options.get(
96  CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
97  ),
98  ): int,
99  vol.Optional(
100  "ph_offset",
101  default=self.config_entryconfig_entryconfig_entry.options.get(
102  "ph_offset", DEFAULT_PH_OFFSET
103  ),
104  ): vol.All(vol.Coerce(float)),
105  }
106  ),
107  )
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:41
OptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:35
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:83
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)