1 """Config flow for Tasmota."""
3 from __future__
import annotations
7 import voluptuous
as vol
13 from .const
import CONF_DISCOVERY_PREFIX, DEFAULT_PREFIX, DOMAIN
17 """Handle a config flow."""
22 """Initialize flow."""
26 self, discovery_info: MqttServiceInfo
27 ) -> ConfigFlowResult:
28 """Handle a flow initialized by MQTT discovery."""
35 if not discovery_info.topic.endswith(
"/config"):
38 if not discovery_info.payload:
43 assert discovery_info.subscribed_topic ==
"tasmota/discovery/#"
44 self.
_prefix_prefix =
"tasmota/discovery"
49 self, user_input: dict[str, Any] |
None =
None
50 ) -> ConfigFlowResult:
51 """Handle a flow initialized by the user."""
60 self, user_input: dict[str, Any] |
None =
None
61 ) -> ConfigFlowResult:
62 """Confirm the setup."""
64 data = {CONF_DISCOVERY_PREFIX: self.
_prefix_prefix}
66 if user_input
is not None:
68 prefix = user_input[CONF_DISCOVERY_PREFIX]
69 if prefix.endswith(
"/#"):
74 errors[
"base"] =
"invalid_discovery_topic"
77 data[CONF_DISCOVERY_PREFIX] = prefix
82 fields[vol.Optional(CONF_DISCOVERY_PREFIX, default=self.
_prefix_prefix)] = str
85 step_id=
"config", data_schema=vol.Schema(fields), errors=errors
89 self, user_input: dict[str, Any] |
None =
None
90 ) -> ConfigFlowResult:
91 """Confirm the setup."""
93 data = {CONF_DISCOVERY_PREFIX: self.
_prefix_prefix}
95 if user_input
is not None:
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_mqtt(self, MqttServiceInfo discovery_info)
ConfigFlowResult async_step_config(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_confirm(self, dict[str, Any]|None user_input=None)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
list[ConfigFlowResult] _async_in_progress(self, bool include_uninitialized=False, dict[str, Any]|None match_context=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
bool show_advanced_options(self)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
str valid_subscribe_topic(Any topic)