1 """Config flow for Tuya."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 from tuya_sharing
import LoginControl
9 import voluptuous
as vol
23 TUYA_RESPONSE_QR_CODE,
25 TUYA_RESPONSE_SUCCESS,
31 """Tuya config flow."""
37 """Initialize the config flow."""
41 self, user_input: dict[str, Any] |
None =
None
42 ) -> ConfigFlowResult:
47 if user_input
is not None:
49 user_input[CONF_USER_CODE]
54 errors[
"base"] =
"login_error"
56 TUYA_RESPONSE_MSG: response.get(TUYA_RESPONSE_MSG,
"Unknown error"),
57 TUYA_RESPONSE_CODE: response.get(TUYA_RESPONSE_CODE,
"0"),
64 data_schema=vol.Schema(
67 CONF_USER_CODE, default=user_input.get(CONF_USER_CODE,
"")
72 description_placeholders=placeholders,
76 self, user_input: dict[str, Any] |
None =
None
77 ) -> ConfigFlowResult:
79 if user_input
is None:
82 data_schema=vol.Schema(
84 vol.Optional(
"QR"): selector.QrCodeSelector(
85 config=selector.QrCodeSelectorConfig(
86 data=f
"tuyaSmart--qrLogin?token={self.__qr_code}",
88 error_correction_level=selector.QrErrorCorrectionLevel.QUARTILE,
95 ret, info = await self.hass.async_add_executor_job(
106 errors={
"base":
"login_error"},
107 data_schema=vol.Schema(
109 vol.Optional(
"QR"): selector.QrCodeSelector(
110 config=selector.QrCodeSelectorConfig(
111 data=f
"tuyaSmart--qrLogin?token={self.__qr_code}",
113 error_correction_level=selector.QrErrorCorrectionLevel.QUARTILE,
118 description_placeholders={
119 TUYA_RESPONSE_MSG: info.get(TUYA_RESPONSE_MSG,
"Unknown error"),
120 TUYA_RESPONSE_CODE: info.get(TUYA_RESPONSE_CODE, 0),
129 "expire_time": info[
"expire_time"],
130 "access_token": info[
"access_token"],
131 "refresh_token": info[
"refresh_token"],
133 CONF_TERMINAL_ID: info[CONF_TERMINAL_ID],
134 CONF_ENDPOINT: info[CONF_ENDPOINT],
144 title=info.get(
"username"),
149 self, entry_data: Mapping[str, Any]
150 ) -> ConfigFlowResult:
151 """Handle initiation of re-authentication with Tuya."""
152 if CONF_USER_CODE
in entry_data:
160 self, user_input: dict[str, Any] |
None =
None
161 ) -> ConfigFlowResult:
162 """Handle re-authentication with a Tuya."""
166 if user_input
is not None:
168 user_input[CONF_USER_CODE]
173 errors[
"base"] =
"login_error"
175 TUYA_RESPONSE_MSG: response.get(TUYA_RESPONSE_MSG,
"Unknown error"),
176 TUYA_RESPONSE_CODE: response.get(TUYA_RESPONSE_CODE,
"0"),
182 step_id=
"reauth_user_code",
183 data_schema=vol.Schema(
186 CONF_USER_CODE, default=user_input.get(CONF_USER_CODE,
"")
191 description_placeholders=placeholders,
195 """Get the QR code."""
196 response = await self.hass.async_add_executor_job(
202 if success := response.get(TUYA_RESPONSE_SUCCESS,
False):
204 self.
__qr_code__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
205 return success, response
ConfigFlowResult async_step_reauth_user_code(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_scan(self, dict[str, Any]|None user_input=None)
tuple[bool, dict[str, Any]] __async_get_qr_code(self, str user_code)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigEntry _get_reauth_entry(self)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)