1 """Code to manage fetching LIVISI data API."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from aiohttp
import ClientConnectorError
9 from aiolivisi
import AioLivisi, LivisiEvent, Websocket
10 from aiolivisi.errors
import TokenExpiredException
23 LIVISI_REACHABILITY_CHANGE,
30 """Class to manage fetching LIVISI data API."""
32 config_entry: ConfigEntry
35 self, hass: HomeAssistant, config_entry: ConfigEntry, aiolivisi: AioLivisi
37 """Initialize my coordinator."""
41 name=
"Livisi devices",
42 update_interval=
timedelta(seconds=DEVICE_POLLING_DELAY),
48 self.devices: set[str] = set()
49 self.rooms: dict[str, Any] = {}
53 self.
portport: int = 0
56 """Get device configuration from LIVISI."""
59 except TokenExpiredException:
60 await self.
aiolivisiaiolivisi.async_set_token(self.
aiolivisiaiolivisi.livisi_connection_data)
62 except ClientConnectorError
as exc:
63 raise UpdateFailed(
"Failed to get livisi devices from controller")
from exc
70 """Set up the Livisi Smart Home Controller."""
71 if not self.
aiolivisiaiolivisi.livisi_connection_data:
72 livisi_connection_data = {
77 await self.
aiolivisiaiolivisi.async_set_token(
78 livisi_connection_data=livisi_connection_data
81 if (controller_type := controller_data[
"controllerType"]) == AVATAR:
82 self.
portport = AVATAR_PORT
85 self.
portport = CLASSIC_PORT
91 """Set the discovered devices list."""
95 """Get state from livisi devices."""
101 return response.get(key, {}).
get(
"value")
104 """Set the room list."""
105 response: list[dict[str, Any]] = await self.
aiolivisiaiolivisi.async_get_all_rooms()
107 for available_room
in response:
108 available_room_config: dict[str, Any] = available_room[
"config"]
109 self.rooms[available_room[
"id"]] = available_room_config[
"name"]
111 def on_data(self, event_data: LivisiEvent) ->
None:
112 """Define a handler to fire when the data is received."""
114 LIVISI_STATE_CHANGE, event_data.source, event_data.onState
117 LIVISI_STATE_CHANGE, event_data.source, event_data.vrccData
120 LIVISI_REACHABILITY_CHANGE, event_data.source, event_data.isReachable
123 LIVISI_STATE_CHANGE, event_data.source, event_data.isOpen
127 """Define a handler to fire when the websocket is closed."""
128 for device_id
in self.devices:
134 """Connect the websocket."""
None _async_dispatcher_send(self, str event, str source, Any data)
None on_data(self, LivisiEvent event_data)
list[dict[str, Any]] _async_update_data(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, AioLivisi aiolivisi)
Any|None async_get_device_state(self, str capability, str key)
list[dict[str, Any]] async_get_devices(self)
None async_set_all_rooms(self)
web.Response get(self, web.Request request, str config_key)
Controller async_get_controller(HomeAssistant hass)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)