1 """Config flow for Panasonic Viera TV integration."""
3 from functools
import partial
6 from urllib.error
import URLError
8 from panasonic_viera
import TV_TYPE_ENCRYPTED, RemoteControl, SOAPError
9 import voluptuous
as vol
24 ERROR_INVALID_PIN_CODE,
27 _LOGGER = logging.getLogger(__name__)
31 """Config flow for Panasonic Viera."""
36 """Initialize the Panasonic Viera config flow."""
37 self.
_data_data: dict[str, Any] = {
42 ATTR_DEVICE_INFO:
None,
45 self.
_remote_remote: RemoteControl |
None =
None
48 self, user_input: dict[str, Any] |
None =
None
49 ) -> ConfigFlowResult:
50 """Handle the initial step."""
51 errors: dict[str, str] = {}
53 if user_input
is not None:
56 self.
_remote_remote = await self.hass.async_add_executor_job(
57 partial(RemoteControl, self.
_data_data[CONF_HOST], self.
_data_data[CONF_PORT])
59 assert self.
_remote_remote
is not None
60 self.
_data_data[ATTR_DEVICE_INFO] = await self.hass.async_add_executor_job(
61 self.
_remote_remote.get_device_info
63 except (URLError, SOAPError, OSError)
as err:
64 _LOGGER.error(
"Could not establish remote connection: %s", err)
65 errors[
"base"] =
"cannot_connect"
67 _LOGGER.exception(
"An unknown error occurred")
73 if self.
_data_data[CONF_NAME] == DEFAULT_NAME:
74 self.
_data_data[CONF_NAME] = self.
_data_data[ATTR_DEVICE_INFO][
78 if self.
_remote_remote.type == TV_TYPE_ENCRYPTED:
82 title=self.
_data_data[CONF_NAME],
88 data_schema=vol.Schema(
92 default=self.
_data_data[CONF_HOST]
93 if self.
_data_data[CONF_HOST]
is not None
98 default=self.
_data_data[CONF_NAME]
99 if self.
_data_data[CONF_NAME]
is not None
108 self, user_input: dict[str, Any] |
None =
None
109 ) -> ConfigFlowResult:
110 """Handle the pairing step."""
111 errors: dict[str, str] = {}
112 assert self.
_remote_remote
is not None
114 if user_input
is not None:
115 pin = user_input[CONF_PIN]
117 await self.hass.async_add_executor_job(
118 partial(self.
_remote_remote.authorize_pin_code, pincode=pin)
120 except SOAPError
as err:
121 _LOGGER.error(
"Invalid PIN code: %s", err)
122 errors[
"base"] = ERROR_INVALID_PIN_CODE
123 except (URLError, OSError)
as err:
124 _LOGGER.error(
"The remote connection was lost: %s", err)
127 _LOGGER.exception(
"Unknown error")
130 if "base" not in errors:
132 CONF_APP_ID: self.
_remote_remote.app_id,
133 CONF_ENCRYPTION_KEY: self.
_remote_remote.enc_key,
139 title=self.
_data_data[CONF_NAME],
140 data=self.
_data_data,
144 await self.hass.async_add_executor_job(
145 partial(self.
_remote_remote.request_pin_code, name=
"Home Assistant")
147 except (URLError, SOAPError, OSError)
as err:
148 _LOGGER.error(
"The remote connection was lost: %s", err)
151 _LOGGER.exception(
"Unknown error")
156 data_schema=vol.Schema({vol.Required(CONF_PIN): str}),
161 """Import a config entry from configuration.yaml."""
166 self.
_data_data = config
168 self.
_data_data[CONF_PORT] = self.
_data_data.
get(CONF_PORT, DEFAULT_PORT)
169 self.
_data_data[CONF_ON_ACTION] = self.
_data_data.
get(CONF_ON_ACTION)
ConfigFlowResult async_step_pairing(self, dict[str, Any]|None user_input=None)
None async_load_data(self, dict[str, Any] config)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
web.Response get(self, web.Request request, str config_key)