1 """Support for LG ThinQ Connect API."""
3 from __future__
import annotations
6 from datetime
import datetime
11 from thinqconnect
import (
21 from .const
import DEVICE_PUSH_MESSAGE, DEVICE_STATUS_MESSAGE
22 from .coordinator
import DeviceDataUpdateCoordinator
24 _LOGGER = logging.getLogger(__name__)
28 """A class that implements MQTT connection."""
35 coordinators: dict[str, DeviceDataUpdateCoordinator],
37 """Initialize a mqtt."""
42 self.
clientclient: ThinQMQTTClient |
None =
None
45 """Create a mqtt client and then try to connect."""
47 self.
clientclient = await ThinQMQTTClient(
50 if self.
clientclient
is None:
54 return await self.
clientclient.async_prepare_mqtt()
55 except (ThinQAPIException, TypeError, ValueError):
56 _LOGGER.exception(
"Failed to connect")
60 """Unregister client and disconnects handlers."""
63 if self.
clientclient
is not None:
66 except (ThinQAPIException, TypeError, ValueError):
67 _LOGGER.exception(
"Failed to disconnect")
70 self, results: list[dict | BaseException |
None]
72 """Check if there exists errors while performing tasks and then return count."""
76 isinstance(result, (TypeError, ValueError))
78 isinstance(result, ThinQAPIException)
79 and result.code != ThinQAPIErrorCodes.ALREADY_SUBSCRIBED_PUSH
85 """Update event subscribes."""
86 _LOGGER.debug(
"async_refresh_subscribe: now=%s", now)
89 self.
hasshass.async_create_task(
90 self.
thinq_apithinq_api.async_post_event_subscribe(coordinator.device_id)
92 for coordinator
in self.
coordinatorscoordinators.values()
95 results = await asyncio.gather(*tasks, return_exceptions=
True)
97 _LOGGER.error(
"Failed to refresh subscription on %s devices", count)
100 """Start push/event subscribes."""
101 _LOGGER.debug(
"async_start_subscribes")
103 if self.
clientclient
is None:
104 _LOGGER.error(
"Failed to start subscription: No client")
108 self.
hasshass.async_create_task(
109 self.
thinq_apithinq_api.async_post_push_subscribe(coordinator.device_id)
111 for coordinator
in self.
coordinatorscoordinators.values()
114 self.
hasshass.async_create_task(
115 self.
thinq_apithinq_api.async_post_event_subscribe(coordinator.device_id)
117 for coordinator
in self.
coordinatorscoordinators.values()
120 results = await asyncio.gather(*tasks, return_exceptions=
True)
122 _LOGGER.error(
"Failed to start subscription on %s devices", count)
127 """Start push/event unsubscribes."""
128 _LOGGER.debug(
"async_end_subscribes")
131 self.
hasshass.async_create_task(
132 self.
thinq_apithinq_api.async_delete_push_subscribe(coordinator.device_id)
134 for coordinator
in self.
coordinatorscoordinators.values()
137 self.
hasshass.async_create_task(
138 self.
thinq_apithinq_api.async_delete_event_subscribe(coordinator.device_id)
140 for coordinator
in self.
coordinatorscoordinators.values()
143 results = await asyncio.gather(*tasks, return_exceptions=
True)
145 _LOGGER.error(
"Failed to end subscription on %s devices", count)
156 """Handle the received message that matching the topic."""
157 decoded = payload.decode()
159 message = json.loads(decoded)
161 _LOGGER.error(
"Failed to parse message: payload=%s", decoded)
164 asyncio.run_coroutine_threadsafe(
169 """Handle received mqtt message."""
171 f
"{message["deviceId
"]}_{list(message["report
"].keys())[0]}"
172 if message[
"deviceType"] == DeviceType.WASHTOWER
173 else message[
"deviceId"]
176 if coordinator
is None:
177 _LOGGER.error(
"Failed to handle device event: No device")
181 "async_handle_device_event: %s, model:%s, message=%s",
182 coordinator.device_name,
183 coordinator.api.device.model_name,
186 push_type = message.get(
"pushType")
188 if push_type == DEVICE_STATUS_MESSAGE:
189 coordinator.handle_update_status(message.get(
"report", {}))
190 elif push_type == DEVICE_PUSH_MESSAGE:
191 coordinator.handle_notification_message(message.get(
"pushCode"))
None async_disconnect(self, Event|None event=None)
None on_message_received(self, str topic, bytes payload, bool dup, Any qos, bool retain, **dict kwargs)
None __init__(self, HomeAssistant hass, ThinQApi thinq_api, str client_id, dict[str, DeviceDataUpdateCoordinator] coordinators)
None async_end_subscribes(self)
None async_refresh_subscribe(self, datetime|None now=None)
None async_handle_device_event(self, dict message)
None async_start_subscribes(self)
int _get_failed_device_count(self, list[dict|BaseException|None] results)
web.Response get(self, web.Request request, str config_key)
def async_connect_mqtt(hass, component)