Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Lutron Caseta."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import logging
7 import os
8 import ssl
9 from typing import Any
10 
11 from pylutron_caseta.pairing import PAIR_CA, PAIR_CERT, PAIR_KEY, async_pair
12 from pylutron_caseta.smartbridge import Smartbridge
13 import voluptuous as vol
14 
15 from homeassistant.components import zeroconf
16 from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
17 from homeassistant.const import CONF_HOST, CONF_NAME
18 from homeassistant.core import callback
19 
20 from .const import (
21  ABORT_REASON_CANNOT_CONNECT,
22  BRIDGE_DEVICE_ID,
23  BRIDGE_TIMEOUT,
24  CONF_CA_CERTS,
25  CONF_CERTFILE,
26  CONF_KEYFILE,
27  DOMAIN,
28  ERROR_CANNOT_CONNECT,
29  STEP_IMPORT_FAILED,
30 )
31 
32 HOSTNAME = "hostname"
33 
34 
35 FILE_MAPPING = {
36  PAIR_KEY: CONF_KEYFILE,
37  PAIR_CERT: CONF_CERTFILE,
38  PAIR_CA: CONF_CA_CERTS,
39 }
40 
41 _LOGGER = logging.getLogger(__name__)
42 
43 ENTRY_DEFAULT_TITLE = "Caséta bridge"
44 
45 DATA_SCHEMA_USER = vol.Schema({vol.Required(CONF_HOST): str})
46 TLS_ASSET_TEMPLATE = "lutron_caseta-{}-{}.pem"
47 
48 
49 class LutronCasetaFlowHandler(ConfigFlow, domain=DOMAIN):
50  """Handle Lutron Caseta config flow."""
51 
52  VERSION = 1
53 
54  def __init__(self) -> None:
55  """Initialize a Lutron Caseta flow."""
56  self.data: dict[str, Any] = {}
57  self.lutron_idlutron_id: str | None = None
58  self.tls_assets_validatedtls_assets_validated = False
59  self.attempted_tls_validationattempted_tls_validation = False
60 
61  async def async_step_user(
62  self, user_input: dict[str, Any] | None = None
63  ) -> ConfigFlowResult:
64  """Handle a flow initialized by the user."""
65  if user_input is not None:
66  self.data[CONF_HOST] = user_input[CONF_HOST]
67  return await self.async_step_linkasync_step_link()
68 
69  return self.async_show_formasync_show_formasync_show_form(step_id="user", data_schema=DATA_SCHEMA_USER)
70 
72  self, discovery_info: zeroconf.ZeroconfServiceInfo
73  ) -> ConfigFlowResult:
74  """Handle a flow initialized by zeroconf discovery."""
75  hostname = discovery_info.hostname
76  if hostname is None or not hostname.lower().startswith("lutron-"):
77  return self.async_abortasync_abortasync_abort(reason="not_lutron_device")
78 
79  self.lutron_idlutron_id = hostname.split("-")[1].replace(".local.", "")
80 
81  await self.async_set_unique_idasync_set_unique_id(self.lutron_idlutron_id)
82  host = discovery_info.host
83  self._abort_if_unique_id_configured_abort_if_unique_id_configured({CONF_HOST: host})
84 
85  self.data[CONF_HOST] = host
86  self.context["title_placeholders"] = {
87  CONF_NAME: self.bridge_idbridge_id,
88  CONF_HOST: host,
89  }
90  return await self.async_step_linkasync_step_link()
91 
92  async def async_step_homekit(
93  self, discovery_info: zeroconf.ZeroconfServiceInfo
94  ) -> ConfigFlowResult:
95  """Handle a flow initialized by homekit discovery."""
96  return await self.async_step_zeroconfasync_step_zeroconfasync_step_zeroconf(discovery_info)
97 
98  async def async_step_link(
99  self, user_input: dict[str, Any] | None = None
100  ) -> ConfigFlowResult:
101  """Handle pairing with the hub."""
102  errors = {}
103  # Abort if existing entry with matching host exists.
104  self._async_abort_entries_match_async_abort_entries_match({CONF_HOST: self.data[CONF_HOST]})
105 
106  self._configure_tls_assets_configure_tls_assets()
107 
108  if (
109  not self.attempted_tls_validationattempted_tls_validation
110  and await self.hass.async_add_executor_job(self._tls_assets_exist_tls_assets_exist)
111  and await self.async_get_lutron_idasync_get_lutron_id()
112  ):
113  self.tls_assets_validatedtls_assets_validated = True
114  self.attempted_tls_validationattempted_tls_validation = True
115 
116  if user_input is not None:
117  if self.tls_assets_validatedtls_assets_validated:
118  # If we previous paired and the tls assets already exist,
119  # we do not need to go though pairing again.
120  return self.async_create_entryasync_create_entryasync_create_entry(title=self.bridge_idbridge_id, data=self.data)
121 
122  assets = None
123  try:
124  assets = await async_pair(self.data[CONF_HOST])
125  except (TimeoutError, OSError):
126  errors["base"] = "cannot_connect"
127 
128  if not errors:
129  await self.hass.async_add_executor_job(self._write_tls_assets_write_tls_assets, assets)
130  return self.async_create_entryasync_create_entryasync_create_entry(title=self.bridge_idbridge_id, data=self.data)
131 
132  return self.async_show_formasync_show_formasync_show_form(
133  step_id="link",
134  errors=errors,
135  description_placeholders={
136  CONF_NAME: self.bridge_idbridge_id,
137  CONF_HOST: self.data[CONF_HOST],
138  },
139  )
140 
141  @property
142  def bridge_id(self):
143  """Return the best identifier for the bridge.
144 
145  If the bridge was not discovered via zeroconf,
146  we fallback to using the host.
147  """
148  return self.lutron_idlutron_id or self.data[CONF_HOST]
149 
150  def _write_tls_assets(self, assets):
151  """Write the tls assets to disk."""
152  for asset_key, conf_key in FILE_MAPPING.items():
153  with open(
154  self.hass.config.path(self.data[conf_key]), "w", encoding="utf8"
155  ) as file_handle:
156  file_handle.write(assets[asset_key])
157 
158  def _tls_assets_exist(self):
159  """Check to see if tls assets are already on disk."""
160  for conf_key in FILE_MAPPING.values():
161  if not os.path.exists(self.hass.config.path(self.data[conf_key])):
162  return False
163  return True
164 
165  @callback
167  """Fill the tls asset locations in self.data."""
168  for asset_key, conf_key in FILE_MAPPING.items():
169  self.data[conf_key] = TLS_ASSET_TEMPLATE.format(self.bridge_idbridge_id, asset_key)
170 
171  async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
172  """Import a new Caseta bridge as a config entry.
173 
174  This flow is triggered by `async_setup`.
175  """
176  host = import_data[CONF_HOST]
177  # Store the imported config for other steps in this flow to access.
178  self.data[CONF_HOST] = host
179 
180  # Abort if existing entry with matching host exists.
181  self._async_abort_entries_match_async_abort_entries_match({CONF_HOST: self.data[CONF_HOST]})
182 
183  self.data[CONF_KEYFILE] = import_data[CONF_KEYFILE]
184  self.data[CONF_CERTFILE] = import_data[CONF_CERTFILE]
185  self.data[CONF_CA_CERTS] = import_data[CONF_CA_CERTS]
186 
187  if not (lutron_id := await self.async_get_lutron_idasync_get_lutron_id()):
188  # Ultimately we won't have a dedicated step for import failure, but
189  # in order to keep configuration.yaml-based configs transparently
190  # working without requiring further actions from the user, we don't
191  # display a form at all before creating a config entry in the
192  # default case, so we're only going to show a form in case the
193  # import fails.
194  # This will change in an upcoming release where UI-based config flow
195  # will become the default for the Lutron Caseta integration (which
196  # will require users to go through a confirmation flow for imports).
197  return await self.async_step_import_failedasync_step_import_failed()
198 
199  await self.async_set_unique_idasync_set_unique_id(lutron_id, raise_on_progress=False)
200  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
201  return self.async_create_entryasync_create_entryasync_create_entry(title=ENTRY_DEFAULT_TITLE, data=self.data)
202 
204  self, user_input: dict[str, Any] | None = None
205  ) -> ConfigFlowResult:
206  """Make failed import surfaced to user."""
207  self.context["title_placeholders"] = {CONF_NAME: self.data[CONF_HOST]}
208 
209  if user_input is None:
210  return self.async_show_formasync_show_formasync_show_form(
211  step_id=STEP_IMPORT_FAILED,
212  description_placeholders={"host": self.data[CONF_HOST]},
213  errors={"base": ERROR_CANNOT_CONNECT},
214  )
215 
216  return self.async_abortasync_abortasync_abort(reason=ABORT_REASON_CANNOT_CONNECT)
217 
218  async def async_get_lutron_id(self) -> str | None:
219  """Check if we can connect to the bridge with the current config."""
220  try:
221  bridge = Smartbridge.create_tls(
222  hostname=self.data[CONF_HOST],
223  keyfile=self.hass.config.path(self.data[CONF_KEYFILE]),
224  certfile=self.hass.config.path(self.data[CONF_CERTFILE]),
225  ca_certs=self.hass.config.path(self.data[CONF_CA_CERTS]),
226  )
227  except ssl.SSLError:
228  _LOGGER.error(
229  "Invalid certificate used to connect to bridge at %s",
230  self.data[CONF_HOST],
231  )
232  return None
233 
234  try:
235  async with asyncio.timeout(BRIDGE_TIMEOUT):
236  await bridge.connect()
237  except TimeoutError:
238  _LOGGER.error(
239  "Timeout while trying to connect to bridge at %s",
240  self.data[CONF_HOST],
241  )
242  else:
243  if not bridge.is_connected():
244  return None
245  devices = bridge.get_devices()
246  bridge_device = devices[BRIDGE_DEVICE_ID]
247  return hex(bridge_device["serial"])[2:].zfill(8)
248  finally:
249  await bridge.close()
250 
251  return None
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:63
ConfigFlowResult async_step_zeroconf(self, zeroconf.ZeroconfServiceInfo discovery_info)
Definition: config_flow.py:73
ConfigFlowResult async_step_import_failed(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:205
ConfigFlowResult async_step_link(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:100
ConfigFlowResult async_step_homekit(self, zeroconf.ZeroconfServiceInfo discovery_info)
Definition: config_flow.py:94
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
Definition: config_flow.py:171
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_step_zeroconf(self, ZeroconfServiceInfo discovery_info)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None open(self, **Any kwargs)
Definition: lock.py:86