1 """Generic Omada API coordinator."""
4 from datetime
import timedelta
7 from tplink_omada_client
import OmadaSiteClient, OmadaSwitchPortDetails
8 from tplink_omada_client.clients
import OmadaWirelessClient
9 from tplink_omada_client.devices
import OmadaGateway, OmadaListDevice, OmadaSwitch
10 from tplink_omada_client.exceptions
import OmadaClientException
15 _LOGGER = logging.getLogger(__name__)
17 POLL_SWITCH_PORT = 300
24 """Coordinator for synchronizing bulk Omada data."""
29 omada_client: OmadaSiteClient,
31 poll_delay: int |
None = 300,
33 """Initialize my coordinator."""
37 name=f
"Omada API Data - {name}",
38 update_interval=
timedelta(seconds=poll_delay)
if poll_delay
else None,
43 """Fetch data from API endpoint."""
45 async
with asyncio.timeout(10):
47 except OmadaClientException
as err:
48 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
51 """Poll the current data from the controller."""
52 raise NotImplementedError(
"Update method not implemented")
56 """Coordinator for getting details about ports on a switch."""
61 omada_client: OmadaSiteClient,
62 network_switch: OmadaSwitch,
64 """Initialize my coordinator."""
66 hass, omada_client, f
"{network_switch.name} Ports", POLL_SWITCH_PORT
70 async
def poll_update(self) -> dict[str, OmadaSwitchPortDetails]:
71 """Poll a switch's current state."""
73 return {p.port_id: p
for p
in ports}
77 """Coordinator for getting details about the site's gateway."""
82 omada_client: OmadaSiteClient,
85 """Initialize my coordinator."""
86 super().
__init__(hass, omada_client,
"Gateway", POLL_GATEWAY)
90 """Poll a the gateway's current state."""
92 return {self.
macmac: gateway}
96 """Coordinator for generic device lists from the controller."""
101 omada_client: OmadaSiteClient,
103 """Initialize my coordinator."""
104 super().
__init__(hass, omada_client,
"DeviceList", POLL_CLIENTS)
107 """Poll the site's current registered Omada devices."""
108 return {d.mac: d
for d
in await self.
omada_clientomada_client.get_devices()}
112 """Coordinator for getting details about the site's connected clients."""
114 def __init__(self, hass: HomeAssistant, omada_client: OmadaSiteClient) ->
None:
115 """Initialize my coordinator."""
116 super().
__init__(hass, omada_client,
"ClientsList", POLL_CLIENTS)
119 """Poll the site's current active wi-fi clients."""
122 async
for c
in self.
omada_clientomada_client.get_connected_clients()
123 if isinstance(c, OmadaWirelessClient)
None __init__(self, HomeAssistant hass, OmadaSiteClient omada_client)
dict[str, OmadaWirelessClient] poll_update(self)
dict[str, _T] _async_update_data(self)
dict[str, _T] poll_update(self)
None __init__(self, HomeAssistant hass, OmadaSiteClient omada_client, str name, int|None poll_delay=300)
None __init__(self, HomeAssistant hass, OmadaSiteClient omada_client)
dict[str, OmadaListDevice] poll_update(self)
dict[str, OmadaGateway] poll_update(self)
None __init__(self, HomeAssistant hass, OmadaSiteClient omada_client, str mac)
dict[str, OmadaSwitchPortDetails] poll_update(self)
None __init__(self, HomeAssistant hass, OmadaSiteClient omada_client, OmadaSwitch network_switch)