1 """Config flow to configure the LG Soundbar integration."""
4 from queue
import Empty, Full, Queue
7 import voluptuous
as vol
12 from .const
import DEFAULT_PORT, DOMAIN
15 vol.Required(CONF_HOST): str,
18 _LOGGER = logging.getLogger(__name__)
24 """LG Soundbar config flow test_connect."""
25 uuid_q = Queue(maxsize=1)
26 name_q = Queue(maxsize=1)
28 def check_msg_response(response, msgs, attr):
30 if msg == msgs
or msg
in msgs:
31 if "data" in response
and attr
in response[
"data"]:
34 "[%s] msg did not contain expected attr [%s]: %s", msg, attr, response
38 def queue_add(attr_q, data):
40 attr_q.put_nowait(data)
42 _LOGGER.debug(
"attempted to add [%s] to full queue", data)
44 def msg_callback(response):
45 if check_msg_response(response, [
"MAC_INFO_DEV",
"PRODUCT_INFO"],
"s_uuid"):
46 queue_add(uuid_q, response[
"data"][
"s_uuid"])
47 if check_msg_response(response,
"SPK_LIST_VIEW_INFO",
"s_user_name"):
48 queue_add(name_q, response[
"data"][
"s_user_name"])
53 connection = temescal.temescal(host, port=port, callback=msg_callback)
55 connection.get_mac_info()
57 connection.get_product_info()
58 details[
"name"] = name_q.get(timeout=QUEUE_TIMEOUT)
59 details[
"uuid"] = uuid_q.get(timeout=QUEUE_TIMEOUT)
62 except TimeoutError
as err:
63 raise ConnectionError(f
"Connection timeout with server: {host}:{port}")
from err
64 except OSError
as err:
65 raise ConnectionError(f
"Cannot resolve hostname: {host}")
from err
71 """LG Soundbar config flow."""
76 """Handle a flow initiated by the user."""
77 if user_input
is None:
82 details = await self.hass.async_add_executor_job(
83 test_connect, user_input[CONF_HOST], DEFAULT_PORT
85 except ConnectionError:
86 errors[
"base"] =
"cannot_connect"
90 CONF_HOST: user_input[CONF_HOST],
91 CONF_PORT: DEFAULT_PORT,
94 unique_id = details[
"uuid"]
100 errors[
"base"] =
"no_data"
105 """Show the form to the user."""
108 data_schema=vol.Schema(DATA_SCHEMA),
109 errors=errors
if errors
else {},
def _show_form(self, errors=None)
ConfigFlowResult async_step_user(self, user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
def test_connect(host, port)