1 """Config flow for Samsung TV."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
6 from functools
import partial
8 from typing
import Any, Self
9 from urllib.parse
import urlparse
12 from samsungtvws.encrypted.authenticator
import SamsungTVEncryptedWSAsyncAuthenticator
13 import voluptuous
as vol
36 from .bridge
import SamsungTVBridge, async_get_device_info, mac_from_device_info
40 CONF_SSDP_MAIN_TV_AGENT_LOCATION,
41 CONF_SSDP_RENDERING_CONTROL_LOCATION,
45 METHOD_ENCRYPTED_WEBSOCKET,
48 RESULT_CANNOT_CONNECT,
54 UPNP_SVC_MAIN_TV_AGENT,
55 UPNP_SVC_RENDERING_CONTROL,
58 DATA_SCHEMA = vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_NAME): str})
62 return udn[5:]
if udn.startswith(
"uuid:")
else udn
67 ssdp_rendering_control_location: str |
None,
68 ssdp_main_tv_agent_location: str |
None,
70 """Return True if the config entry information is complete.
72 If we do not have an ssdp location we consider it complete
73 as some TVs will not support SSDP/UPNP
77 and entry.data.get(CONF_MAC)
79 not ssdp_rendering_control_location
80 or entry.data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION)
83 not ssdp_main_tv_agent_location
84 or entry.data.get(CONF_SSDP_MAIN_TV_AGENT_LOCATION)
90 current_unformatted_mac: str, formatted_mac: str
92 """Check if two macs are the same but formatted incorrectly."""
93 current_formatted_mac =
format_mac(current_unformatted_mac)
95 current_formatted_mac == formatted_mac
96 and current_unformatted_mac != current_formatted_mac
101 """Handle a Samsung TV config flow."""
107 """Initialize flow."""
108 self.
_host_host: str =
""
109 self.
_mac_mac: str |
None =
None
110 self.
_udn_udn: str |
None =
None
111 self.
_upnp_udn_upnp_udn: str |
None =
None
115 self.
_model_model: str |
None =
None
117 self.
_method_method: str |
None =
None
118 self.
_name_name: str |
None =
None
119 self.
_title_title: str =
""
120 self._id: int |
None =
None
121 self.
_bridge_bridge: SamsungTVBridge |
None =
None
122 self.
_device_info_device_info: dict[str, Any] |
None =
None
123 self.
_authenticator_authenticator: SamsungTVEncryptedWSAsyncAuthenticator |
None =
None
126 """Generate the base config entry without the method."""
127 assert self.
_bridge_bridge
is not None
129 CONF_HOST: self.
_host_host,
130 CONF_MAC: self.
_mac_mac,
131 CONF_MANUFACTURER: self.
_manufacturer_manufacturer
or DEFAULT_MANUFACTURER,
132 CONF_METHOD: self.
_bridge_bridge.method,
133 CONF_MODEL: self.
_model_model,
134 CONF_NAME: self.
_name_name,
135 CONF_PORT: self.
_bridge_bridge.port,
141 """Get device entry."""
145 data[CONF_TOKEN] = self.
_bridge_bridge.token
152 """Set device unique_id."""
159 self, raise_on_progress: bool =
True
161 """Set the unique id from the udn."""
162 assert self.
_host_host
is not None
176 if raise_on_progress:
180 """Abort and update host and mac if we have it."""
181 updates = {CONF_HOST: self.
_host_host}
183 updates[CONF_MAC] = self.
_mac_mac
185 updates[CONF_MODEL] = self.
_model_model
187 updates[CONF_SSDP_RENDERING_CONTROL_LOCATION] = (
191 updates[CONF_SSDP_MAIN_TV_AGENT_LOCATION] = (
197 """Create the bridge."""
199 if result
not in SUCCESSFUL_RESULTS:
200 LOGGER.debug(
"No working config found for %s", self.
_host_host)
202 assert method
is not None
203 self.
_bridge_bridge = SamsungTVBridge.get_bridge(self.hass, method, self.
_host_host)
207 ) -> tuple[str, str | None, dict[str, Any] | None]:
208 """Get device info and method only once."""
215 LOGGER.debug(
"Host:%s did not return device info", self.
_host_host)
216 return result,
None,
None
220 """Try to get the device info."""
222 if result
not in SUCCESSFUL_RESULTS:
226 dev_info = info.get(
"device", {})
227 assert dev_info
is not None
228 if (device_type := dev_info.get(
"type")) !=
"Samsung SmartTV":
230 "Host:%s has type: %s which is not supported", self.
_host_host, device_type
233 self.
_model_model = dev_info.get(
"modelName")
234 name = dev_info.get(
"name")
235 self.
_name_name = name.replace(
"[TV] ",
"")
if name
else device_type
236 self.
_title_title = f
"{self._name} ({self._model})"
243 elif mac := await self.hass.async_add_executor_job(
244 partial(getmac.get_mac_address, ip=self.
_host_host)
251 self.
_host_host = await self.hass.async_add_executor_job(
252 socket.gethostbyname, user_input[CONF_HOST]
254 except socket.gaierror
as err:
255 raise AbortFlow(RESULT_UNKNOWN_HOST)
from err
256 self.
_name_name = user_input.get(CONF_NAME, self.
_host_host)
or ""
260 self, user_input: dict[str, Any] |
None =
None
261 ) -> ConfigFlowResult:
262 """Handle a flow initialized by the user."""
263 if user_input
is not None:
268 if self.
_bridge_bridge.method != METHOD_LEGACY:
271 if self.
_bridge_bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
278 self, user_input: dict[str, Any] |
None =
None
279 ) -> ConfigFlowResult:
280 """Handle a pairing by accepting the message on the TV."""
281 assert self.
_bridge_bridge
is not None
282 errors: dict[str, str] = {}
283 if user_input
is not None:
285 if result == RESULT_SUCCESS:
287 if result != RESULT_AUTH_MISSING:
289 errors = {
"base": RESULT_AUTH_MISSING}
291 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
295 description_placeholders={
"device": self.
_title_title},
296 data_schema=vol.Schema({}),
300 self, user_input: dict[str, Any] |
None =
None
301 ) -> ConfigFlowResult:
302 """Handle a encrypted pairing."""
303 assert self.
_host_host
is not None
306 errors: dict[str, str] = {}
308 if user_input
is not None:
310 (pin := user_input.get(
"pin"))
311 and (token := await self.
_authenticator_authenticator.try_pin(pin))
312 and (session_id := await self.
_authenticator_authenticator.get_session_id_and_close())
318 CONF_SESSION_ID: session_id,
322 errors = {
"base": RESULT_INVALID_PIN}
324 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
326 step_id=
"encrypted_pairing",
328 description_placeholders={
"device": self.
_title_title},
329 data_schema=vol.Schema({vol.Required(
"pin"): str}),
335 ) -> tuple[ConfigEntry | None, bool]:
336 """Get first existing matching entry (prefer unique id)."""
337 matching_host_entry: ConfigEntry |
None =
None
339 if (self.
_mac_mac
and self.
_mac_mac == entry.data.get(CONF_MAC))
or (
342 LOGGER.debug(
"Found entry matching unique_id for %s", self.
_host_host)
345 if entry.data[CONF_HOST] == self.
_host_host:
346 LOGGER.debug(
"Found entry matching host for %s", self.
_host_host)
347 matching_host_entry = entry
349 return matching_host_entry,
False
354 ) -> ConfigEntry | None:
355 """Check existing entries and update them.
357 Returns the existing entry if it was updated.
362 entry_kw_args: dict = {}
364 entry.unique_id
is None
365 or (is_unique_match
and self.
unique_idunique_id != entry.unique_id)
367 entry_kw_args[
"unique_id"] = self.
unique_idunique_id
368 data: dict[str, Any] =
dict(entry.data)
369 update_ssdp_rendering_control_location = (
371 and data.get(CONF_SSDP_RENDERING_CONTROL_LOCATION)
374 update_ssdp_main_tv_agent_location = (
376 and data.get(CONF_SSDP_MAIN_TV_AGENT_LOCATION)
379 update_mac = self.
_mac_mac
and (
380 not (data_mac := data.get(CONF_MAC))
383 update_model = self.
_model_model
and not data.get(CONF_MODEL)
385 update_ssdp_rendering_control_location
386 or update_ssdp_main_tv_agent_location
390 if update_ssdp_rendering_control_location:
391 data[CONF_SSDP_RENDERING_CONTROL_LOCATION] = (
394 if update_ssdp_main_tv_agent_location:
395 data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] = (
399 data[CONF_MAC] = self.
_mac_mac
401 data[CONF_MODEL] = self.
_model_model
402 entry_kw_args[
"data"] = data
403 if not entry_kw_args:
405 LOGGER.debug(
"Updating existing config entry with %s", entry_kw_args)
406 self.hass.config_entries.async_update_entry(entry, **entry_kw_args)
407 if entry.state != ConfigEntryState.LOADED:
410 self.hass.async_create_task(
411 self.hass.config_entries.async_reload(entry.entry_id)
417 """Start discovery."""
418 assert self.
_host_host
is not None
427 if self.hass.config_entries.flow.async_has_matching_flow(self):
431 """Return True if other_flow is matching this flow."""
432 return other_flow._host == self.
_host_host
442 self, discovery_info: ssdp.SsdpServiceInfo
443 ) -> ConfigFlowResult:
444 """Handle a flow initialized by ssdp discovery."""
445 LOGGER.debug(
"Samsung device found via SSDP: %s", discovery_info)
446 model_name: str = discovery_info.upnp.get(ssdp.ATTR_UPNP_MODEL_NAME)
or ""
447 if discovery_info.ssdp_st == UPNP_SVC_RENDERING_CONTROL:
450 "Set SSDP RenderingControl location to: %s",
453 elif discovery_info.ssdp_st == UPNP_SVC_MAIN_TV_AGENT:
456 "Set SSDP MainTvAgent location to: %s",
460 discovery_info.upnp[ssdp.ATTR_UPNP_UDN]
462 if hostname := urlparse(discovery_info.ssdp_location
or "").hostname:
463 self.
_host_host = hostname
464 self.
_manufacturer_manufacturer = discovery_info.upnp.get(ssdp.ATTR_UPNP_MANUFACTURER)
478 if self.
_method_method == METHOD_LEGACY
and discovery_info.ssdp_st
in (
479 UPNP_SVC_RENDERING_CONTROL,
480 UPNP_SVC_MAIN_TV_AGENT,
485 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
489 self, discovery_info: dhcp.DhcpServiceInfo
490 ) -> ConfigFlowResult:
491 """Handle a flow initialized by dhcp discovery."""
492 LOGGER.debug(
"Samsung device found via DHCP: %s", discovery_info)
494 self.
_host_host = discovery_info.ip
497 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
501 self, discovery_info: zeroconf.ZeroconfServiceInfo
502 ) -> ConfigFlowResult:
503 """Handle a flow initialized by zeroconf discovery."""
504 LOGGER.debug(
"Samsung device found via ZEROCONF: %s", discovery_info)
506 self.
_host_host = discovery_info.host
509 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
513 self, user_input: dict[str, Any] |
None =
None
514 ) -> ConfigFlowResult:
515 """Handle user-confirmation of discovered node."""
516 if user_input
is not None:
519 if self.
_bridge_bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
524 step_id=
"confirm", description_placeholders={
"device": self.
_title_title}
528 self, entry_data: Mapping[str, Any]
529 ) -> ConfigFlowResult:
530 """Handle configuration by re-auth."""
531 if entry_data.get(CONF_MODEL)
and entry_data.get(CONF_NAME):
532 self.
_title_title = f
"{entry_data[CONF_NAME]} ({entry_data[CONF_MODEL]})"
534 self.
_title_title = entry_data.get(CONF_NAME)
or entry_data[CONF_HOST]
538 self, user_input: dict[str, Any] |
None =
None
539 ) -> ConfigFlowResult:
540 """Confirm reauth."""
544 method = reauth_entry.data[CONF_METHOD]
545 if user_input
is not None:
546 if method == METHOD_ENCRYPTED_WEBSOCKET:
548 bridge = SamsungTVBridge.get_bridge(
551 reauth_entry.data[CONF_HOST],
553 result = await bridge.async_try_connect()
554 if result == RESULT_SUCCESS:
555 new_data =
dict(reauth_entry.data)
556 new_data[CONF_TOKEN] = bridge.token
561 if result
not in (RESULT_AUTH_MISSING, RESULT_CANNOT_CONNECT):
565 errors = {
"base": RESULT_AUTH_MISSING}
567 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
569 step_id=
"reauth_confirm",
571 description_placeholders={
"device": self.
_title_title},
583 self, user_input: dict[str, Any] |
None =
None
584 ) -> ConfigFlowResult:
585 """Confirm reauth (encrypted method)."""
592 if user_input
is not None:
594 (pin := user_input.get(
"pin"))
595 and (token := await self.
_authenticator_authenticator.try_pin(pin))
596 and (session_id := await self.
_authenticator_authenticator.get_session_id_and_close())
602 CONF_SESSION_ID: session_id,
606 errors = {
"base": RESULT_INVALID_PIN}
608 self.context[
"title_placeholders"] = {
"device": self.
_title_title}
610 step_id=
"reauth_confirm_encrypted",
612 description_placeholders={
"device": self.
_title_title},
613 data_schema=vol.Schema({vol.Required(
"pin"): str}),
None _async_start_encrypted_pairing(self, str host)
None _async_set_device_unique_id(self, bool raise_on_progress=True)
None _async_set_name_host_from_input(self, dict[str, Any] user_input)
bool _async_get_and_check_device_info(self)
ConfigFlowResult async_step_pairing(self, dict[str, Any]|None user_input=None)
None _async_create_bridge(self)
ConfigEntry|None _async_update_existing_matching_entry(self)
ConfigFlowResult async_step_reauth_confirm_encrypted(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_dhcp(self, dhcp.DhcpServiceInfo discovery_info)
None _abort_if_manufacturer_is_not_samsung(self)
ConfigFlowResult async_step_ssdp(self, ssdp.SsdpServiceInfo discovery_info)
ConfigFlowResult async_step_zeroconf(self, zeroconf.ZeroconfServiceInfo discovery_info)
ConfigFlowResult async_step_encrypted_pairing(self, dict[str, Any]|None user_input=None)
None _async_update_and_abort_for_matching_unique_id(self)
tuple[str, str|None, dict[str, Any]|None] _async_get_device_info_and_method(self)
tuple[ConfigEntry|None, bool] _async_get_existing_matching_entry(self)
None _async_start_discovery_with_mac_address(self)
bool is_matching(self, Self other_flow)
ConfigFlowResult _get_entry_from_bridge(self)
_ssdp_main_tv_agent_location
None _async_set_unique_id_from_udn(self, bool raise_on_progress=True)
_ssdp_rendering_control_location
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_confirm(self, dict[str, Any]|None user_input=None)
None _async_abort_if_host_already_in_progress(self)
dict[str, Any] _base_config_entry(self)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry _get_reauth_entry(self)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
Device async_try_connect(str ip_address)
tuple[str, int|None, str|None, dict[str, Any]|None] async_get_device_info(HomeAssistant hass, str host)
str|None mac_from_device_info(dict[str, Any] info)
bool _mac_is_same_with_incorrect_formatting(str current_unformatted_mac, str formatted_mac)
bool _entry_is_complete(ConfigEntry entry, str|None ssdp_rendering_control_location, str|None ssdp_main_tv_agent_location)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)