1 """Config flow for Smappee."""
6 from pysmappee
import helper, mqtt
7 import voluptuous
as vol
20 SUPPORTED_LOCAL_DEVICES,
25 config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
27 """Config Smappee config flow."""
35 """Create an entry for the flow."""
37 await self.async_set_unique_id(unique_id=f
"{DOMAIN}Cloud")
38 return self.async_create_entry(title=f
"{DOMAIN}Cloud", data=data)
43 return logging.getLogger(__name__)
46 self, discovery_info: zeroconf.ZeroconfServiceInfo
47 ) -> ConfigFlowResult:
48 """Handle zeroconf discovery."""
50 if not discovery_info.hostname.startswith(SUPPORTED_LOCAL_DEVICES):
51 return self.async_abort(reason=
"invalid_mdns")
53 serial_number = discovery_info.hostname.replace(
".local.",
"").replace(
58 await self.async_set_unique_id(serial_number)
59 self._abort_if_unique_id_configured()
63 return self.async_abort(reason=
"already_configured_device")
65 self.context[
"title_placeholders"] = {
"name": serial_number}
72 self, user_input: dict[str, Any] |
None =
None
73 ) -> ConfigFlowResult:
74 """Confirm zeroconf flow."""
75 errors: dict[str, str] = {}
79 return self.async_abort(reason=
"already_configured_device")
81 if user_input
is None:
82 return self.async_show_form(
83 step_id=
"zeroconf_confirm",
84 description_placeholders={
"serialnumber": self.
serial_numberserial_number},
91 smappee_mqtt = mqtt.SmappeeLocalMqtt(serial_number=self.
serial_numberserial_number)
92 connect = await self.hass.async_add_executor_job(smappee_mqtt.start_attempt)
94 return self.async_abort(reason=
"cannot_connect")
97 smappee_api = api.api.SmappeeLocalApi(ip=self.
ip_addressip_address)
98 logon = await self.hass.async_add_executor_job(smappee_api.logon)
100 return self.async_abort(reason=
"cannot_connect")
102 return self.async_create_entry(
103 title=f
"{DOMAIN}{self.serial_number}",
111 self, user_input: dict[str, Any] |
None =
None
112 ) -> ConfigFlowResult:
113 """Handle a flow initiated by the user."""
117 return self.async_abort(reason=
"already_configured_device")
122 self, user_input: dict[str, str] |
None =
None
123 ) -> ConfigFlowResult:
124 """Decide environment, cloud or local."""
125 if user_input
is None:
126 return self.async_show_form(
127 step_id=
"environment",
128 data_schema=vol.Schema(
130 vol.Required(
"environment", default=ENV_CLOUD): vol.In(
131 [ENV_CLOUD, ENV_LOCAL]
140 if user_input[
"environment"] == ENV_LOCAL:
144 if user_input[
"environment"] == ENV_CLOUD
and self._async_current_entries():
145 return self.async_abort(reason=
"already_configured_local_device")
147 return await self.async_step_pick_implementation()
150 self, user_input: dict[str, str] |
None =
None
151 ) -> ConfigFlowResult:
152 """Handle local flow."""
153 if user_input
is None:
154 return self.async_show_form(
156 data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
160 ip_address = user_input[
"host"]
164 smappee_api = api.api.SmappeeLocalApi(ip=ip_address)
165 logon = await self.hass.async_add_executor_job(smappee_api.logon)
166 if logon
is not None:
167 advanced_config = await self.hass.async_add_executor_job(
168 smappee_api.load_advanced_config
170 for config_item
in advanced_config:
171 if config_item[
"key"] ==
"mdnsHostName":
172 serial_number = config_item[
"value"]
175 smappee_mqtt = mqtt.SmappeeLocalMqtt()
176 connect = await self.hass.async_add_executor_job(smappee_mqtt.start_attempt)
178 return self.async_abort(reason=
"cannot_connect")
180 serial_number = await self.hass.async_add_executor_job(
181 smappee_mqtt.start_and_wait_for_config
183 await self.hass.async_add_executor_job(smappee_mqtt.stop)
184 if serial_number
is None:
185 return self.async_abort(reason=
"cannot_connect")
187 if serial_number
is None or not serial_number.startswith(
188 SUPPORTED_LOCAL_DEVICES
190 return self.async_abort(reason=
"invalid_mdns")
192 serial_number = serial_number.replace(
"Smappee",
"")
195 await self.async_set_unique_id(serial_number, raise_on_progress=
False)
196 self._abort_if_unique_id_configured()
198 return self.async_create_entry(
199 title=f
"{DOMAIN}{serial_number}",
200 data={CONF_IP_ADDRESS: ip_address, CONF_SERIALNUMBER: serial_number},
204 """Check if a CLOUD device has already been added."""
205 for entry
in self._async_current_entries():
206 if entry.unique_id
is not None and entry.unique_id == f
"{DOMAIN}Cloud":
logging.Logger logger(self)
ConfigFlowResult async_step_environment(self, dict[str, str]|None user_input=None)
def is_cloud_device_already_added(self)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
def async_oauth_create_entry(self, data)
ConfigFlowResult async_step_zeroconf(self, zeroconf.ZeroconfServiceInfo discovery_info)
ConfigFlowResult async_step_zeroconf_confirm(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_local(self, dict[str, str]|None user_input=None)