Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for Smappee."""
2 
3 import logging
4 from typing import Any
5 
6 from pysmappee import helper, mqtt
7 import voluptuous as vol
8 
9 from homeassistant.components import zeroconf
10 from homeassistant.config_entries import ConfigFlowResult
11 from homeassistant.const import CONF_HOST, CONF_IP_ADDRESS
12 from homeassistant.helpers import config_entry_oauth2_flow
13 
14 from . import api
15 from .const import (
16  CONF_SERIALNUMBER,
17  DOMAIN,
18  ENV_CLOUD,
19  ENV_LOCAL,
20  SUPPORTED_LOCAL_DEVICES,
21 )
22 
23 
25  config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
26 ):
27  """Config Smappee config flow."""
28 
29  DOMAIN = DOMAIN
30 
31  ip_address: str # Set by zeroconf step, used by zeroconf_confirm step
32  serial_number: str # Set by zeroconf step, used by zeroconf_confirm step
33 
34  async def async_oauth_create_entry(self, data):
35  """Create an entry for the flow."""
36 
37  await self.async_set_unique_id(unique_id=f"{DOMAIN}Cloud")
38  return self.async_create_entry(title=f"{DOMAIN}Cloud", data=data)
39 
40  @property
41  def logger(self) -> logging.Logger:
42  """Return logger."""
43  return logging.getLogger(__name__)
44 
46  self, discovery_info: zeroconf.ZeroconfServiceInfo
47  ) -> ConfigFlowResult:
48  """Handle zeroconf discovery."""
49 
50  if not discovery_info.hostname.startswith(SUPPORTED_LOCAL_DEVICES):
51  return self.async_abort(reason="invalid_mdns")
52 
53  serial_number = discovery_info.hostname.replace(".local.", "").replace(
54  "Smappee", ""
55  )
56 
57  # Check if already configured (local)
58  await self.async_set_unique_id(serial_number)
59  self._abort_if_unique_id_configured()
60 
61  # Check if already configured (cloud)
62  if self.is_cloud_device_already_addedis_cloud_device_already_added():
63  return self.async_abort(reason="already_configured_device")
64 
65  self.context["title_placeholders"] = {"name": serial_number}
66  self.ip_addressip_address = discovery_info.host
67  self.serial_numberserial_number = serial_number
68 
69  return await self.async_step_zeroconf_confirmasync_step_zeroconf_confirm()
70 
72  self, user_input: dict[str, Any] | None = None
73  ) -> ConfigFlowResult:
74  """Confirm zeroconf flow."""
75  errors: dict[str, str] = {}
76 
77  # Check if already configured (cloud)
78  if self.is_cloud_device_already_addedis_cloud_device_already_added():
79  return self.async_abort(reason="already_configured_device")
80 
81  if user_input is None:
82  return self.async_show_form(
83  step_id="zeroconf_confirm",
84  description_placeholders={"serialnumber": self.serial_numberserial_number},
85  errors=errors,
86  )
87 
88  # Attempt to make a connection to the local device
89  if helper.is_smappee_genius(self.serial_numberserial_number):
90  # next generation device, attempt connect to the local mqtt broker
91  smappee_mqtt = mqtt.SmappeeLocalMqtt(serial_number=self.serial_numberserial_number)
92  connect = await self.hass.async_add_executor_job(smappee_mqtt.start_attempt)
93  if not connect:
94  return self.async_abort(reason="cannot_connect")
95  else:
96  # legacy devices, without local mqtt broker, try api access
97  smappee_api = api.api.SmappeeLocalApi(ip=self.ip_addressip_address)
98  logon = await self.hass.async_add_executor_job(smappee_api.logon)
99  if logon is None:
100  return self.async_abort(reason="cannot_connect")
101 
102  return self.async_create_entry(
103  title=f"{DOMAIN}{self.serial_number}",
104  data={
105  CONF_IP_ADDRESS: self.ip_addressip_address,
106  CONF_SERIALNUMBER: self.serial_numberserial_number,
107  },
108  )
109 
110  async def async_step_user(
111  self, user_input: dict[str, Any] | None = None
112  ) -> ConfigFlowResult:
113  """Handle a flow initiated by the user."""
114 
115  # If there is a CLOUD entry already, abort a new LOCAL entry
116  if self.is_cloud_device_already_addedis_cloud_device_already_added():
117  return self.async_abort(reason="already_configured_device")
118 
119  return await self.async_step_environmentasync_step_environment()
120 
122  self, user_input: dict[str, str] | None = None
123  ) -> ConfigFlowResult:
124  """Decide environment, cloud or local."""
125  if user_input is None:
126  return self.async_show_form(
127  step_id="environment",
128  data_schema=vol.Schema(
129  {
130  vol.Required("environment", default=ENV_CLOUD): vol.In(
131  [ENV_CLOUD, ENV_LOCAL]
132  )
133  }
134  ),
135  errors={},
136  )
137 
138  # Environment chosen, request additional host information for LOCAL or OAuth2 flow for CLOUD
139  # Ask for host detail
140  if user_input["environment"] == ENV_LOCAL:
141  return await self.async_step_localasync_step_local()
142 
143  # Abort cloud option if a LOCAL entry has already been added
144  if user_input["environment"] == ENV_CLOUD and self._async_current_entries():
145  return self.async_abort(reason="already_configured_local_device")
146 
147  return await self.async_step_pick_implementation()
148 
149  async def async_step_local(
150  self, user_input: dict[str, str] | None = None
151  ) -> ConfigFlowResult:
152  """Handle local flow."""
153  if user_input is None:
154  return self.async_show_form(
155  step_id="local",
156  data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
157  errors={},
158  )
159  # In a LOCAL setup we still need to resolve the host to serial number
160  ip_address = user_input["host"]
161  serial_number = None
162 
163  # Attempt 1: try to use the local api (older generation) to resolve host to serialnumber
164  smappee_api = api.api.SmappeeLocalApi(ip=ip_address)
165  logon = await self.hass.async_add_executor_job(smappee_api.logon)
166  if logon is not None:
167  advanced_config = await self.hass.async_add_executor_job(
168  smappee_api.load_advanced_config
169  )
170  for config_item in advanced_config:
171  if config_item["key"] == "mdnsHostName":
172  serial_number = config_item["value"]
173  else:
174  # Attempt 2: try to use the local mqtt broker (newer generation) to resolve host to serialnumber
175  smappee_mqtt = mqtt.SmappeeLocalMqtt()
176  connect = await self.hass.async_add_executor_job(smappee_mqtt.start_attempt)
177  if not connect:
178  return self.async_abort(reason="cannot_connect")
179 
180  serial_number = await self.hass.async_add_executor_job(
181  smappee_mqtt.start_and_wait_for_config
182  )
183  await self.hass.async_add_executor_job(smappee_mqtt.stop)
184  if serial_number is None:
185  return self.async_abort(reason="cannot_connect")
186 
187  if serial_number is None or not serial_number.startswith(
188  SUPPORTED_LOCAL_DEVICES
189  ):
190  return self.async_abort(reason="invalid_mdns")
191 
192  serial_number = serial_number.replace("Smappee", "")
193 
194  # Check if already configured (local)
195  await self.async_set_unique_id(serial_number, raise_on_progress=False)
196  self._abort_if_unique_id_configured()
197 
198  return self.async_create_entry(
199  title=f"{DOMAIN}{serial_number}",
200  data={CONF_IP_ADDRESS: ip_address, CONF_SERIALNUMBER: serial_number},
201  )
202 
204  """Check if a CLOUD device has already been added."""
205  for entry in self._async_current_entries():
206  if entry.unique_id is not None and entry.unique_id == f"{DOMAIN}Cloud":
207  return True
208  return False
ConfigFlowResult async_step_environment(self, dict[str, str]|None user_input=None)
Definition: config_flow.py:123
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:112
ConfigFlowResult async_step_zeroconf(self, zeroconf.ZeroconfServiceInfo discovery_info)
Definition: config_flow.py:47
ConfigFlowResult async_step_zeroconf_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:73
ConfigFlowResult async_step_local(self, dict[str, str]|None user_input=None)
Definition: config_flow.py:151