1 """DataUpdateCoordinator for WLED."""
3 from __future__
import annotations
9 WLEDConnectionClosedError,
22 DEFAULT_KEEP_MAIN_LIGHT,
25 RELEASES_SCAN_INTERVAL,
31 """Class to manage fetching WLED data from single endpoint."""
34 config_entry: ConfigEntry
42 """Initialize global WLED data updater."""
44 CONF_KEEP_MAIN_LIGHT, DEFAULT_KEEP_MAIN_LIGHT
47 self.
unsubunsub: CALLBACK_TYPE |
None =
None
54 update_interval=SCAN_INTERVAL,
59 """Return if the coordinated device has a main light."""
61 self.
datadata
is not None and len(self.
datadata.state.segments) > 1
66 """Use WebSocket for updates, instead of polling."""
68 async
def listen() -> None:
69 """Listen for state changes via WebSocket."""
71 await self.
wledwled.connect()
72 except WLEDError
as err:
73 self.
loggerlogger.info(err)
81 except WLEDConnectionClosedError
as err:
83 self.
loggerlogger.info(err)
84 except WLEDError
as err:
87 self.
loggerlogger.error(err)
90 await self.
wledwled.disconnect()
93 self.
unsubunsub =
None
95 async
def close_websocket(_: Event) ->
None:
96 """Close WebSocket connection."""
97 self.
unsubunsub =
None
98 await self.
wledwled.disconnect()
101 self.
unsubunsub = self.
hasshass.bus.async_listen_once(
102 EVENT_HOMEASSISTANT_STOP, close_websocket
106 self.
config_entryconfig_entry.async_create_background_task(
107 self.
hasshass, listen(),
"wled-listen"
111 """Fetch data from WLED."""
114 except WLEDError
as error:
115 raise UpdateFailed(f
"Invalid response from API: {error}")
from error
119 device.info.websocket
is not None
120 and not self.
wledwled.connected
121 and not self.
unsubunsub
129 """Class to manage fetching WLED releases."""
132 """Initialize global WLED releases updater."""
139 update_interval=RELEASES_SCAN_INTERVAL,
143 """Fetch release data from WLED."""
145 return await self.
wledwled.releases()
146 except WLEDError
as error:
147 raise UpdateFailed(f
"Invalid response from GitHub API: {error}")
from error
None _use_websocket(self)
bool has_main_light(self)
WLEDDevice _async_update_data(self)
None __init__(self, HomeAssistant hass, *ConfigEntry entry)
Releases _async_update_data(self)
None __init__(self, HomeAssistant hass)
None async_set_updated_data(self, _DataT data)
None async_update_listeners(self)
IssData update(pyiss.ISS iss)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)