1 """DataUpdateCoordinator for Nice G.O."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from dataclasses
import dataclass
8 from datetime
import datetime
11 from typing
import TYPE_CHECKING, Any
32 CONF_REFRESH_TOKEN_CREATION_TIME,
34 REFRESH_TOKEN_EXPIRY_TIME,
37 _LOGGER = logging.getLogger(__name__)
39 RECONNECT_ATTEMPTS = 3
45 """Nice G.O. device dataclass."""
51 light_status: bool |
None
54 vacation_mode: bool |
None
58 """DataUpdateCoordinator for Nice G.O."""
60 config_entry: ConfigEntry
63 def __init__(self, hass: HomeAssistant) ->
None:
64 """Initialize DataUpdateCoordinator for Nice G.O."""
73 CONF_REFRESH_TOKEN_CREATION_TIME
77 self.
apiapi = NiceGOApi()
79 self.
_unsub_data_unsub_data: Callable[[],
None] |
None =
None
86 """Stop reconnecting if hass is stopping."""
90 self, device_type: str, barrier_state: BarrierState
91 ) -> NiceGODevice |
None:
92 """Parse barrier data."""
94 device_id = barrier_state.deviceId
95 name = barrier_state.reported[
"displayName"]
96 if barrier_state.reported[
"migrationStatus"] ==
"NOT_STARTED":
97 ir.async_create_issue(
100 f
"firmware_update_required_{device_id}",
102 severity=ir.IssueSeverity.ERROR,
103 translation_key=
"firmware_update_required",
104 translation_placeholders={
"device_name": name},
107 ir.async_delete_issue(
108 self.
hasshass, DOMAIN, f
"firmware_update_required_{device_id}"
110 barrier_status_raw = [
111 int(x)
for x
in barrier_state.reported[
"barrierStatus"].split(
",")
114 if BARRIER_STATUS[
int(barrier_status_raw[2])] ==
"STATIONARY":
115 barrier_status =
"open" if barrier_status_raw[0] == 1
else "closed"
117 barrier_status = BARRIER_STATUS[
int(barrier_status_raw[2])].lower()
120 barrier_state.reported[
"lightStatus"].split(
",")[0] ==
"1"
121 if barrier_state.reported.get(
"lightStatus")
124 fw_version = barrier_state.reported[
"deviceFwVersion"]
125 if barrier_state.connectionState:
126 connected = barrier_state.connectionState.connected
127 elif device_type ==
"Mms100":
128 connected = barrier_state.reported.get(
"radioConnected", 0) == 1
132 vacation_mode = barrier_state.reported.get(
"vcnMode",
None)
138 barrier_status=barrier_status,
139 light_status=light_status,
140 fw_version=fw_version,
142 vacation_mode=vacation_mode,
149 """Set up the coordinator."""
150 async
with asyncio.timeout(10):
153 + REFRESH_TOKEN_EXPIRY_TIME.total_seconds()
156 if datetime.now().timestamp() >= expiry_time:
159 await self.
apiapi.authenticate_refresh(
162 _LOGGER.debug(
"Authenticated with Nice G.O. API")
164 barriers = await self.
apiapi.get_all_barriers()
166 await self.
_parse_barrier_parse_barrier(barrier.type, barrier.state)
167 for barrier
in barriers
172 barrier.id: barrier
for barrier
in parsed_barriers
if barrier
175 except AuthFailedError
as e:
176 raise ConfigEntryAuthFailed
from e
177 except ApiError
as e:
178 raise UpdateFailed
from e
183 """Update the refresh token with Nice G.O. API."""
184 _LOGGER.debug(
"Updating the refresh token with Nice G.O. API")
189 except AuthFailedError
as e:
190 _LOGGER.exception(
"Authentication failed")
191 raise ConfigEntryAuthFailed
from e
192 except ApiError
as e:
193 _LOGGER.exception(
"API error")
194 raise UpdateFailed
from e
199 CONF_REFRESH_TOKEN: refresh_token,
200 CONF_REFRESH_TOKEN_CREATION_TIME: datetime.now().timestamp(),
202 self.
hasshass.config_entries.async_update_entry(self.
config_entryconfig_entry, data=data)
205 """Listen to the websocket for updates."""
212 for _
in range(RECONNECT_ATTEMPTS):
217 await self.
apiapi.connect(reconnect=
True)
219 _LOGGER.exception(
"API error")
223 await asyncio.sleep(RECONNECT_DELAY)
227 "Failed to connect to the websocket, reconnect attempts exhausted"
231 async
def on_data(self, data: dict[str, Any]) ->
None:
232 """Handle incoming data from the websocket."""
233 _LOGGER.debug(
"Received data from the websocket")
235 raw_data = data[
"data"][
"devicesStatesUpdateFeed"][
"item"]
241 deviceId=raw_data[
"deviceId"],
242 desired=json.loads(raw_data[
"desired"]),
243 reported=json.loads(raw_data[
"reported"]),
244 connectionState=ConnectionState(
245 connected=raw_data[
"connectionState"][
"connected"],
246 updatedTimestamp=raw_data[
"connectionState"][
"updatedTimestamp"],
248 if raw_data[
"connectionState"]
250 version=raw_data[
"version"],
251 timestamp=raw_data[
"timestamp"],
254 if parsed_data
is None:
257 data_copy = self.
datadata.copy()
258 data_copy[parsed_data.id] = parsed_data
263 """Handle the websocket connection."""
264 _LOGGER.debug(
"Connected to the websocket")
273 """Handle the websocket connection loss. Don't need to do much since the library will automatically reconnect."""
274 _LOGGER.debug(
"Connection lost to the websocket")
278 await asyncio.sleep(RECONNECT_DELAY)
280 _LOGGER.debug(
"Reconnected, not setting error")
287 """Unsubscribe from the websocket."""
299 _LOGGER.debug(
"Unsubscribed from the websocket")
refresh_token_creation_time
None __init__(self, HomeAssistant hass)
None async_ha_stop(self, Event event)
None _update_refresh_token(self)
None on_connection_lost(self, dict[str, Exception] data)
NiceGODevice|None _parse_barrier(self, str device_type, BarrierState barrier_state)
dict[str, NiceGODevice] _async_update_data(self)
None on_data(self, dict[str, Any] data)
None async_set_updated_data(self, _DataT data)
None async_set_update_error(self, Exception err)
Callable[[], None] subscribe(HomeAssistant hass, str topic, MessageCallbackType msg_callback, int qos=DEFAULT_QOS, str encoding="utf-8")
def authenticate(HomeAssistant hass, host, port, servers)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)