Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Ruckus DataUpdateCoordinator."""
2 
3 from datetime import timedelta
4 import logging
5 
6 from aioruckus import AjaxSession
7 from aioruckus.exceptions import AuthenticationError, SchemaError
8 
9 from homeassistant.core import HomeAssistant
10 from homeassistant.exceptions import ConfigEntryAuthFailed
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import API_CLIENT_MAC, DOMAIN, KEY_SYS_CLIENTS, SCAN_INTERVAL
14 
15 _LOGGER = logging.getLogger(__package__)
16 
17 
19  """Coordinator to manage data from Ruckus client."""
20 
21  def __init__(self, hass: HomeAssistant, *, ruckus: AjaxSession) -> None:
22  """Initialize global Ruckus data updater."""
23  self.ruckusruckus = ruckus
24 
25  update_interval = timedelta(seconds=SCAN_INTERVAL)
26 
27  super().__init__(
28  hass,
29  _LOGGER,
30  name=DOMAIN,
31  update_interval=update_interval,
32  )
33 
34  async def _fetch_clients(self) -> dict:
35  """Fetch clients from the API and format them."""
36  clients = await self.ruckusruckus.api.get_active_clients()
37  _LOGGER.debug("fetched %d active clients", len(clients))
38  return {client[API_CLIENT_MAC]: client for client in clients}
39 
40  async def _async_update_data(self) -> dict:
41  """Fetch Ruckus data."""
42  try:
43  return {KEY_SYS_CLIENTS: await self._fetch_clients_fetch_clients()}
44  except AuthenticationError as autherror:
45  raise ConfigEntryAuthFailed(autherror) from autherror
46  except (ConnectionError, SchemaError) as conerr:
47  raise UpdateFailed(conerr) from conerr
None __init__(self, HomeAssistant hass, *AjaxSession ruckus)
Definition: coordinator.py:21