Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for Nice G.O."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 from datetime import datetime
9 import json
10 import logging
11 from typing import TYPE_CHECKING, Any
12 
13 from nice_go import (
14  BARRIER_STATUS,
15  ApiError,
16  AuthFailedError,
17  BarrierState,
18  ConnectionState,
19  NiceGOApi,
20 )
21 
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.const import CONF_EMAIL, CONF_PASSWORD
24 from homeassistant.core import Event, HomeAssistant, callback
25 from homeassistant.exceptions import ConfigEntryAuthFailed
26 from homeassistant.helpers import issue_registry as ir
27 from homeassistant.helpers.aiohttp_client import async_get_clientsession
28 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
29 
30 from .const import (
31  CONF_REFRESH_TOKEN,
32  CONF_REFRESH_TOKEN_CREATION_TIME,
33  DOMAIN,
34  REFRESH_TOKEN_EXPIRY_TIME,
35 )
36 
37 _LOGGER = logging.getLogger(__name__)
38 
39 RECONNECT_ATTEMPTS = 3
40 RECONNECT_DELAY = 5
41 
42 
43 @dataclass
45  """Nice G.O. device dataclass."""
46 
47  type: str
48  id: str
49  name: str
50  barrier_status: str
51  light_status: bool | None
52  fw_version: str
53  connected: bool
54  vacation_mode: bool | None
55 
56 
57 class NiceGOUpdateCoordinator(DataUpdateCoordinator[dict[str, NiceGODevice]]):
58  """DataUpdateCoordinator for Nice G.O."""
59 
60  config_entry: ConfigEntry
61  organization_id: str
62 
63  def __init__(self, hass: HomeAssistant) -> None:
64  """Initialize DataUpdateCoordinator for Nice G.O."""
65  super().__init__(
66  hass,
67  _LOGGER,
68  name="Nice G.O.",
69  )
70 
71  self.refresh_tokenrefresh_token = self.config_entryconfig_entry.data[CONF_REFRESH_TOKEN]
72  self.refresh_token_creation_timerefresh_token_creation_time = self.config_entryconfig_entry.data[
73  CONF_REFRESH_TOKEN_CREATION_TIME
74  ]
75  self.emailemail = self.config_entryconfig_entry.data[CONF_EMAIL]
76  self.passwordpassword = self.config_entryconfig_entry.data[CONF_PASSWORD]
77  self.apiapi = NiceGOApi()
78  self._unsub_connected_unsub_connected: Callable[[], None] | None = None
79  self._unsub_data_unsub_data: Callable[[], None] | None = None
80  self._unsub_connection_lost_unsub_connection_lost: Callable[[], None] | None = None
81  self.connectedconnected = False
82  self._hass_stopping_hass_stopping: bool = hass.is_stopping
83 
84  @callback
85  def async_ha_stop(self, event: Event) -> None:
86  """Stop reconnecting if hass is stopping."""
87  self._hass_stopping_hass_stopping = True
88 
89  async def _parse_barrier(
90  self, device_type: str, barrier_state: BarrierState
91  ) -> NiceGODevice | None:
92  """Parse barrier data."""
93 
94  device_id = barrier_state.deviceId
95  name = barrier_state.reported["displayName"]
96  if barrier_state.reported["migrationStatus"] == "NOT_STARTED":
97  ir.async_create_issue(
98  self.hasshass,
99  DOMAIN,
100  f"firmware_update_required_{device_id}",
101  is_fixable=False,
102  severity=ir.IssueSeverity.ERROR,
103  translation_key="firmware_update_required",
104  translation_placeholders={"device_name": name},
105  )
106  return None
107  ir.async_delete_issue(
108  self.hasshass, DOMAIN, f"firmware_update_required_{device_id}"
109  )
110  barrier_status_raw = [
111  int(x) for x in barrier_state.reported["barrierStatus"].split(",")
112  ]
113 
114  if BARRIER_STATUS[int(barrier_status_raw[2])] == "STATIONARY":
115  barrier_status = "open" if barrier_status_raw[0] == 1 else "closed"
116  else:
117  barrier_status = BARRIER_STATUS[int(barrier_status_raw[2])].lower()
118 
119  light_status = (
120  barrier_state.reported["lightStatus"].split(",")[0] == "1"
121  if barrier_state.reported.get("lightStatus")
122  else None
123  )
124  fw_version = barrier_state.reported["deviceFwVersion"]
125  if barrier_state.connectionState:
126  connected = barrier_state.connectionState.connected
127  elif device_type == "Mms100":
128  connected = barrier_state.reported.get("radioConnected", 0) == 1
129  else:
130  # Assume connected
131  connected = True
132  vacation_mode = barrier_state.reported.get("vcnMode", None)
133 
134  return NiceGODevice(
135  type=device_type,
136  id=device_id,
137  name=name,
138  barrier_status=barrier_status,
139  light_status=light_status,
140  fw_version=fw_version,
141  connected=connected,
142  vacation_mode=vacation_mode,
143  )
144 
145  async def _async_update_data(self) -> dict[str, NiceGODevice]:
146  return self.datadata
147 
148  async def _async_setup(self) -> None:
149  """Set up the coordinator."""
150  async with asyncio.timeout(10):
151  expiry_time = (
152  self.refresh_token_creation_timerefresh_token_creation_time
153  + REFRESH_TOKEN_EXPIRY_TIME.total_seconds()
154  )
155  try:
156  if datetime.now().timestamp() >= expiry_time:
157  await self._update_refresh_token_update_refresh_token()
158  else:
159  await self.apiapi.authenticate_refresh(
160  self.refresh_tokenrefresh_token, async_get_clientsession(self.hasshass)
161  )
162  _LOGGER.debug("Authenticated with Nice G.O. API")
163 
164  barriers = await self.apiapi.get_all_barriers()
165  parsed_barriers = [
166  await self._parse_barrier_parse_barrier(barrier.type, barrier.state)
167  for barrier in barriers
168  ]
169 
170  # Parse the barriers and save them in a dictionary
171  devices = {
172  barrier.id: barrier for barrier in parsed_barriers if barrier
173  }
174  self.organization_idorganization_id = await barriers[0].get_attr("organization")
175  except AuthFailedError as e:
176  raise ConfigEntryAuthFailed from e
177  except ApiError as e:
178  raise UpdateFailed from e
179  else:
180  self.async_set_updated_dataasync_set_updated_data(devices)
181 
182  async def _update_refresh_token(self) -> None:
183  """Update the refresh token with Nice G.O. API."""
184  _LOGGER.debug("Updating the refresh token with Nice G.O. API")
185  try:
186  refresh_token = await self.apiapi.authenticate(
187  self.emailemail, self.passwordpassword, async_get_clientsession(self.hasshass)
188  )
189  except AuthFailedError as e:
190  _LOGGER.exception("Authentication failed")
191  raise ConfigEntryAuthFailed from e
192  except ApiError as e:
193  _LOGGER.exception("API error")
194  raise UpdateFailed from e
195 
196  self.refresh_tokenrefresh_token = refresh_token
197  data = {
198  **self.config_entryconfig_entry.data,
199  CONF_REFRESH_TOKEN: refresh_token,
200  CONF_REFRESH_TOKEN_CREATION_TIME: datetime.now().timestamp(),
201  }
202  self.hasshass.config_entries.async_update_entry(self.config_entryconfig_entry, data=data)
203 
204  async def client_listen(self) -> None:
205  """Listen to the websocket for updates."""
206  self._unsub_connected_unsub_connected = self.apiapi.listen("on_connected", self.on_connectedon_connected)
207  self._unsub_data_unsub_data = self.apiapi.listen("on_data", self.on_dataon_data)
208  self._unsub_connection_lost_unsub_connection_lost = self.apiapi.listen(
209  "on_connection_lost", self.on_connection_loston_connection_lost
210  )
211 
212  for _ in range(RECONNECT_ATTEMPTS):
213  if self._hass_stopping_hass_stopping:
214  return
215 
216  try:
217  await self.apiapi.connect(reconnect=True)
218  except ApiError:
219  _LOGGER.exception("API error")
220  else:
221  return
222 
223  await asyncio.sleep(RECONNECT_DELAY)
224 
225  self.async_set_update_errorasync_set_update_error(
226  TimeoutError(
227  "Failed to connect to the websocket, reconnect attempts exhausted"
228  )
229  )
230 
231  async def on_data(self, data: dict[str, Any]) -> None:
232  """Handle incoming data from the websocket."""
233  _LOGGER.debug("Received data from the websocket")
234  _LOGGER.debug(data)
235  raw_data = data["data"]["devicesStatesUpdateFeed"]["item"]
236  parsed_data = await self._parse_barrier_parse_barrier(
237  self.datadata[
238  raw_data["deviceId"]
239  ].type, # Device type is not sent in device state update, and it can't change, so we just reuse the existing one
240  BarrierState(
241  deviceId=raw_data["deviceId"],
242  desired=json.loads(raw_data["desired"]),
243  reported=json.loads(raw_data["reported"]),
244  connectionState=ConnectionState(
245  connected=raw_data["connectionState"]["connected"],
246  updatedTimestamp=raw_data["connectionState"]["updatedTimestamp"],
247  )
248  if raw_data["connectionState"]
249  else None,
250  version=raw_data["version"],
251  timestamp=raw_data["timestamp"],
252  ),
253  )
254  if parsed_data is None:
255  return
256 
257  data_copy = self.datadata.copy()
258  data_copy[parsed_data.id] = parsed_data
259 
260  self.async_set_updated_dataasync_set_updated_data(data_copy)
261 
262  async def on_connected(self) -> None:
263  """Handle the websocket connection."""
264  _LOGGER.debug("Connected to the websocket")
265  self.connectedconnected = True
266 
267  await self.apiapi.subscribe(self.organization_idorganization_id)
268 
269  if not self.last_update_successlast_update_success:
270  self.async_set_updated_dataasync_set_updated_data(self.datadata)
271 
272  async def on_connection_lost(self, data: dict[str, Exception]) -> None:
273  """Handle the websocket connection loss. Don't need to do much since the library will automatically reconnect."""
274  _LOGGER.debug("Connection lost to the websocket")
275  self.connectedconnected = False
276 
277  # Give some time for reconnection
278  await asyncio.sleep(RECONNECT_DELAY)
279  if self.connectedconnected:
280  _LOGGER.debug("Reconnected, not setting error")
281  return
282 
283  # There's likely a problem with the connection, and not the server being flaky
284  self.async_set_update_errorasync_set_update_error(data["exception"])
285 
286  def unsubscribe(self) -> None:
287  """Unsubscribe from the websocket."""
288  if TYPE_CHECKING:
289  assert self._unsub_connected_unsub_connected is not None
290  assert self._unsub_data_unsub_data is not None
291  assert self._unsub_connection_lost_unsub_connection_lost is not None
292 
293  self._unsub_connection_lost_unsub_connection_lost()
294  self._unsub_connected_unsub_connected()
295  self._unsub_data_unsub_data()
296  self._unsub_connected_unsub_connected = None
297  self._unsub_data_unsub_data = None
298  self._unsub_connection_lost_unsub_connection_lost = None
299  _LOGGER.debug("Unsubscribed from the websocket")
NiceGODevice|None _parse_barrier(self, str device_type, BarrierState barrier_state)
Definition: coordinator.py:91
Callable[[], None] subscribe(HomeAssistant hass, str topic, MessageCallbackType msg_callback, int qos=DEFAULT_QOS, str encoding="utf-8")
Definition: client.py:247
def authenticate(HomeAssistant hass, host, port, servers)
Definition: config_flow.py:104
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)