Home Assistant Unofficial Reference 2024.12.1
mqtt.py
Go to the documentation of this file.
1 """Support for LG ThinQ Connect API."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import datetime
7 import json
8 import logging
9 from typing import Any
10 
11 from thinqconnect import (
12  DeviceType,
13  ThinQApi,
14  ThinQAPIErrorCodes,
15  ThinQAPIException,
16  ThinQMQTTClient,
17 )
18 
19 from homeassistant.core import Event, HomeAssistant
20 
21 from .const import DEVICE_PUSH_MESSAGE, DEVICE_STATUS_MESSAGE
22 from .coordinator import DeviceDataUpdateCoordinator
23 
24 _LOGGER = logging.getLogger(__name__)
25 
26 
27 class ThinQMQTT:
28  """A class that implements MQTT connection."""
29 
30  def __init__(
31  self,
32  hass: HomeAssistant,
33  thinq_api: ThinQApi,
34  client_id: str,
35  coordinators: dict[str, DeviceDataUpdateCoordinator],
36  ) -> None:
37  """Initialize a mqtt."""
38  self.hasshass = hass
39  self.thinq_apithinq_api = thinq_api
40  self.client_idclient_id = client_id
41  self.coordinatorscoordinators = coordinators
42  self.clientclient: ThinQMQTTClient | None = None
43 
44  async def async_connect(self) -> bool:
45  """Create a mqtt client and then try to connect."""
46  try:
47  self.clientclient = await ThinQMQTTClient(
48  self.thinq_apithinq_api, self.client_idclient_id, self.on_message_receivedon_message_received
49  )
50  if self.clientclient is None:
51  return False
52 
53  # Connect to server and create certificate.
54  return await self.clientclient.async_prepare_mqtt()
55  except (ThinQAPIException, TypeError, ValueError):
56  _LOGGER.exception("Failed to connect")
57  return False
58 
59  async def async_disconnect(self, event: Event | None = None) -> None:
60  """Unregister client and disconnects handlers."""
61  await self.async_end_subscribesasync_end_subscribes()
62 
63  if self.clientclient is not None:
64  try:
65  await self.clientclient.async_disconnect()
66  except (ThinQAPIException, TypeError, ValueError):
67  _LOGGER.exception("Failed to disconnect")
68 
70  self, results: list[dict | BaseException | None]
71  ) -> int:
72  """Check if there exists errors while performing tasks and then return count."""
73  # Note that result code '1207' means 'Already subscribed push'
74  # and is not actually fail.
75  return sum(
76  isinstance(result, (TypeError, ValueError))
77  or (
78  isinstance(result, ThinQAPIException)
79  and result.code != ThinQAPIErrorCodes.ALREADY_SUBSCRIBED_PUSH
80  )
81  for result in results
82  )
83 
84  async def async_refresh_subscribe(self, now: datetime | None = None) -> None:
85  """Update event subscribes."""
86  _LOGGER.debug("async_refresh_subscribe: now=%s", now)
87 
88  tasks = [
89  self.hasshass.async_create_task(
90  self.thinq_apithinq_api.async_post_event_subscribe(coordinator.device_id)
91  )
92  for coordinator in self.coordinatorscoordinators.values()
93  ]
94  if tasks:
95  results = await asyncio.gather(*tasks, return_exceptions=True)
96  if (count := self._get_failed_device_count_get_failed_device_count(results)) > 0:
97  _LOGGER.error("Failed to refresh subscription on %s devices", count)
98 
99  async def async_start_subscribes(self) -> None:
100  """Start push/event subscribes."""
101  _LOGGER.debug("async_start_subscribes")
102 
103  if self.clientclient is None:
104  _LOGGER.error("Failed to start subscription: No client")
105  return
106 
107  tasks = [
108  self.hasshass.async_create_task(
109  self.thinq_apithinq_api.async_post_push_subscribe(coordinator.device_id)
110  )
111  for coordinator in self.coordinatorscoordinators.values()
112  ]
113  tasks.extend(
114  self.hasshass.async_create_task(
115  self.thinq_apithinq_api.async_post_event_subscribe(coordinator.device_id)
116  )
117  for coordinator in self.coordinatorscoordinators.values()
118  )
119  if tasks:
120  results = await asyncio.gather(*tasks, return_exceptions=True)
121  if (count := self._get_failed_device_count_get_failed_device_count(results)) > 0:
122  _LOGGER.error("Failed to start subscription on %s devices", count)
123 
124  await self.clientclient.async_connect_mqtt()
125 
126  async def async_end_subscribes(self) -> None:
127  """Start push/event unsubscribes."""
128  _LOGGER.debug("async_end_subscribes")
129 
130  tasks = [
131  self.hasshass.async_create_task(
132  self.thinq_apithinq_api.async_delete_push_subscribe(coordinator.device_id)
133  )
134  for coordinator in self.coordinatorscoordinators.values()
135  ]
136  tasks.extend(
137  self.hasshass.async_create_task(
138  self.thinq_apithinq_api.async_delete_event_subscribe(coordinator.device_id)
139  )
140  for coordinator in self.coordinatorscoordinators.values()
141  )
142  if tasks:
143  results = await asyncio.gather(*tasks, return_exceptions=True)
144  if (count := self._get_failed_device_count_get_failed_device_count(results)) > 0:
145  _LOGGER.error("Failed to end subscription on %s devices", count)
146 
148  self,
149  topic: str,
150  payload: bytes,
151  dup: bool,
152  qos: Any,
153  retain: bool,
154  **kwargs: dict,
155  ) -> None:
156  """Handle the received message that matching the topic."""
157  decoded = payload.decode()
158  try:
159  message = json.loads(decoded)
160  except ValueError:
161  _LOGGER.error("Failed to parse message: payload=%s", decoded)
162  return
163 
164  asyncio.run_coroutine_threadsafe(
165  self.async_handle_device_eventasync_handle_device_event(message), self.hasshass.loop
166  ).result()
167 
168  async def async_handle_device_event(self, message: dict) -> None:
169  """Handle received mqtt message."""
170  unique_id = (
171  f"{message["deviceId"]}_{list(message["report"].keys())[0]}"
172  if message["deviceType"] == DeviceType.WASHTOWER
173  else message["deviceId"]
174  )
175  coordinator = self.coordinatorscoordinators.get(unique_id)
176  if coordinator is None:
177  _LOGGER.error("Failed to handle device event: No device")
178  return
179 
180  _LOGGER.debug(
181  "async_handle_device_event: %s, model:%s, message=%s",
182  coordinator.device_name,
183  coordinator.api.device.model_name,
184  message,
185  )
186  push_type = message.get("pushType")
187 
188  if push_type == DEVICE_STATUS_MESSAGE:
189  coordinator.handle_update_status(message.get("report", {}))
190  elif push_type == DEVICE_PUSH_MESSAGE:
191  coordinator.handle_notification_message(message.get("pushCode"))
None async_disconnect(self, Event|None event=None)
Definition: mqtt.py:59
None on_message_received(self, str topic, bytes payload, bool dup, Any qos, bool retain, **dict kwargs)
Definition: mqtt.py:155
None __init__(self, HomeAssistant hass, ThinQApi thinq_api, str client_id, dict[str, DeviceDataUpdateCoordinator] coordinators)
Definition: mqtt.py:36
None async_refresh_subscribe(self, datetime|None now=None)
Definition: mqtt.py:84
None async_handle_device_event(self, dict message)
Definition: mqtt.py:168
int _get_failed_device_count(self, list[dict|BaseException|None] results)
Definition: mqtt.py:71
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
def async_connect_mqtt(hass, component)
Definition: __init__.py:134