1 """Config flow for MySensors."""
3 from __future__
import annotations
8 from awesomeversion
import (
10 AwesomeVersionStrategy,
11 AwesomeVersionStrategyException,
13 import voluptuous
as vol
16 DOMAIN
as MQTT_DOMAIN,
18 valid_subscribe_topic,
30 CONF_GATEWAY_TYPE_MQTT,
31 CONF_GATEWAY_TYPE_SERIAL,
32 CONF_GATEWAY_TYPE_TCP,
33 CONF_PERSISTENCE_FILE,
37 CONF_TOPIC_OUT_PREFIX,
42 from .gateway
import MQTT_COMPONENT, is_serial_port, is_socket_address, try_connect
44 DEFAULT_BAUD_RATE = 115200
45 DEFAULT_TCP_PORT = 5003
46 DEFAULT_VERSION =
"1.4"
48 _PORT_SELECTOR = vol.All(
49 selector.NumberSelector(
50 selector.NumberSelectorConfig(
51 min=1, max=65535, mode=selector.NumberSelectorMode.BOX
59 """Validate that persistence file path ends in either .pickle or .json."""
60 if value.endswith((
".json",
".pickle")):
62 raise vol.Invalid(f
"{value} does not end in either `.json` or `.pickle`")
66 """Create a schema with options common to all gateway types."""
71 "suggested_value": user_input.get(CONF_VERSION, DEFAULT_VERSION)
74 vol.Optional(CONF_PERSISTENCE_FILE): str,
79 """Validate a version string from the user."""
85 AwesomeVersionStrategy.SIMPLEVER,
86 AwesomeVersionStrategy.SEMVER,
89 except AwesomeVersionStrategyException:
94 return {CONF_VERSION:
"invalid_version"}
98 gw_type: ConfGatewayType, user_input: dict[str, Any], entry: ConfigEntry
100 """Check if another ConfigDevice is actually the same as user_input.
102 This function only compares addresses and tcp ports, so it is possible to fool it with tricks like port forwarding.
104 if entry.data[CONF_DEVICE] != user_input[CONF_DEVICE]:
106 if gw_type == CONF_GATEWAY_TYPE_TCP:
107 entry_tcp_port: int = entry.data[CONF_TCP_PORT]
108 input_tcp_port: int = user_input[CONF_TCP_PORT]
109 return entry_tcp_port == input_tcp_port
110 if gw_type == CONF_GATEWAY_TYPE_MQTT:
112 entry.data[CONF_TOPIC_IN_PREFIX],
113 entry.data[CONF_TOPIC_OUT_PREFIX],
116 user_input.get(CONF_TOPIC_IN_PREFIX)
in entry_topics
117 or user_input.get(CONF_TOPIC_OUT_PREFIX)
in entry_topics
123 """Handle a config flow."""
126 """Set up config flow."""
127 self.
_gw_type_gw_type: str |
None =
None
130 self, user_input: dict[str, str] |
None =
None
131 ) -> ConfigFlowResult:
132 """Create a config entry from frontend user input."""
136 self, user_input: dict[str, str] |
None =
None
137 ) -> ConfigFlowResult:
138 """Show the select gateway type menu."""
140 step_id=
"select_gateway_type",
141 menu_options=[
"gw_serial",
"gw_tcp",
"gw_mqtt"],
145 self, user_input: dict[str, Any] |
None =
None
146 ) -> ConfigFlowResult:
147 """Create config entry for a serial gateway."""
148 gw_type = self.
_gw_type_gw_type = CONF_GATEWAY_TYPE_SERIAL
149 errors: dict[str, str] = {}
151 if user_input
is not None:
152 errors.update(await self.
validate_commonvalidate_common(gw_type, errors, user_input))
156 user_input = user_input
or {}
157 schema: VolDictType = {
159 CONF_DEVICE, default=user_input.get(CONF_DEVICE,
"/dev/ttyACM0")
163 default=user_input.get(CONF_BAUD_RATE, DEFAULT_BAUD_RATE),
169 step_id=
"gw_serial", data_schema=vol.Schema(schema), errors=errors
173 self, user_input: dict[str, Any] |
None =
None
174 ) -> ConfigFlowResult:
175 """Create a config entry for a tcp gateway."""
176 gw_type = self.
_gw_type_gw_type = CONF_GATEWAY_TYPE_TCP
177 errors: dict[str, str] = {}
179 if user_input
is not None:
180 errors.update(await self.
validate_commonvalidate_common(gw_type, errors, user_input))
184 user_input = user_input
or {}
185 schema: VolDictType = {
187 CONF_DEVICE, default=user_input.get(CONF_DEVICE,
"127.0.0.1")
190 CONF_TCP_PORT, default=user_input.get(CONF_TCP_PORT, DEFAULT_TCP_PORT)
196 step_id=
"gw_tcp", data_schema=vol.Schema(schema), errors=errors
201 if topic == other_config.data.get(
203 )
or topic == other_config.data.get(CONF_TOPIC_OUT_PREFIX):
208 self, user_input: dict[str, Any] |
None =
None
209 ) -> ConfigFlowResult:
210 """Create a config entry for a mqtt gateway."""
212 if MQTT_DOMAIN
not in self.hass.config.components:
215 gw_type = self.
_gw_type_gw_type = CONF_GATEWAY_TYPE_MQTT
216 errors: dict[str, str] = {}
218 if user_input
is not None:
219 user_input[CONF_DEVICE] = MQTT_COMPONENT
224 errors[CONF_TOPIC_IN_PREFIX] =
"invalid_subscribe_topic"
227 errors[CONF_TOPIC_IN_PREFIX] =
"duplicate_topic"
232 errors[CONF_TOPIC_OUT_PREFIX] =
"invalid_publish_topic"
235 user_input[CONF_TOPIC_IN_PREFIX]
236 == user_input[CONF_TOPIC_OUT_PREFIX]
238 errors[CONF_TOPIC_OUT_PREFIX] =
"same_topic"
240 errors[CONF_TOPIC_OUT_PREFIX] =
"duplicate_topic"
242 errors.update(await self.
validate_commonvalidate_common(gw_type, errors, user_input))
246 user_input = user_input
or {}
247 schema: VolDictType = {
249 CONF_TOPIC_IN_PREFIX, default=user_input.get(CONF_TOPIC_IN_PREFIX,
"")
252 CONF_TOPIC_OUT_PREFIX, default=user_input.get(CONF_TOPIC_OUT_PREFIX,
"")
254 vol.Required(CONF_RETAIN, default=user_input.get(CONF_RETAIN,
True)): bool,
259 step_id=
"gw_mqtt", data_schema=vol.Schema(schema), errors=errors
264 """Create the config entry."""
266 title=f
"{user_input[CONF_DEVICE]}",
267 data={**user_input, CONF_GATEWAY_TYPE: self.
_gw_type_gw_type},
271 return os.path.realpath(os.path.normcase(self.hass.config.path(path)))
275 gw_type: ConfGatewayType,
276 errors: dict[str, str],
277 user_input: dict[str, Any],
279 """Validate parameters common to all gateway types."""
282 if gw_type != CONF_GATEWAY_TYPE_MQTT:
283 if gw_type == CONF_GATEWAY_TYPE_TCP:
284 verification_func = is_socket_address
286 verification_func = is_serial_port
289 await self.hass.async_add_executor_job(
290 verification_func, user_input[CONF_DEVICE]
293 errors[CONF_DEVICE] = (
295 if gw_type == CONF_GATEWAY_TYPE_TCP
296 else "invalid_serial"
298 if CONF_PERSISTENCE_FILE
in user_input:
302 errors[CONF_PERSISTENCE_FILE] =
"invalid_persistence_file"
304 real_persistence_path = user_input[CONF_PERSISTENCE_FILE] = (
308 if CONF_PERSISTENCE_FILE
not in other_entry.data:
311 other_entry.data[CONF_PERSISTENCE_FILE]
313 errors[CONF_PERSISTENCE_FILE] =
"duplicate_persistence_file"
319 errors[
"base"] =
"already_configured"
323 if not errors
and not await
try_connect(self.hass, gw_type, user_input):
324 errors[
"base"] =
"cannot_connect"
ConfigFlowResult async_step_gw_tcp(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_select_gateway_type(self, dict[str, str]|None user_input=None)
ConfigFlowResult _async_create_entry(self, dict[str, Any] user_input)
str _normalize_persistence_file(self, str path)
ConfigFlowResult async_step_gw_serial(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, str]|None user_input=None)
ConfigFlowResult async_step_gw_mqtt(self, dict[str, Any]|None user_input=None)
bool _check_topic_exists(self, str topic)
dict[str, str] validate_common(self, ConfGatewayType gw_type, dict[str, str] errors, dict[str, Any] user_input)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_menu(self, *str|None step_id=None, Container[str] menu_options, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
str valid_subscribe_topic(Any topic)
str valid_publish_topic(Any topic)
dict[str, str] _validate_version(str version)
str is_persistence_file(str value)
dict _get_schema_common(dict[str, str] user_input)
bool _is_same_device(ConfGatewayType gw_type, dict[str, Any] user_input, ConfigEntry entry)
bool try_connect(HomeAssistant hass, ConfGatewayType gateway_type, dict[str, Any] user_input)