1 """Ruckus DataUpdateCoordinator."""
3 from datetime
import timedelta
6 from aioruckus
import AjaxSession
7 from aioruckus.exceptions
import AuthenticationError, SchemaError
13 from .const
import API_CLIENT_MAC, DOMAIN, KEY_SYS_CLIENTS, SCAN_INTERVAL
15 _LOGGER = logging.getLogger(__package__)
19 """Coordinator to manage data from Ruckus client."""
21 def __init__(self, hass: HomeAssistant, *, ruckus: AjaxSession) ->
None:
22 """Initialize global Ruckus data updater."""
25 update_interval =
timedelta(seconds=SCAN_INTERVAL)
31 update_interval=update_interval,
35 """Fetch clients from the API and format them."""
36 clients = await self.
ruckusruckus.api.get_active_clients()
37 _LOGGER.debug(
"fetched %d active clients", len(clients))
38 return {client[API_CLIENT_MAC]: client
for client
in clients}
41 """Fetch Ruckus data."""
43 return {KEY_SYS_CLIENTS: await self.
_fetch_clients_fetch_clients()}
44 except AuthenticationError
as autherror:
46 except (ConnectionError, SchemaError)
as conerr:
dict _fetch_clients(self)
None __init__(self, HomeAssistant hass, *AjaxSession ruckus)
dict _async_update_data(self)