1 """Coordinator for myUplink."""
3 import asyncio.timeouts
4 from dataclasses
import dataclass
5 from datetime
import datetime, timedelta
8 from myuplink
import Device, DevicePoint, MyUplinkAPI, System
13 _LOGGER = logging.getLogger(__name__)
18 """Represent coordinator data."""
21 devices: dict[str, Device]
22 points: dict[str, dict[str, DevicePoint]]
27 """Coordinator for myUplink data."""
29 def __init__(self, hass: HomeAssistant, api: MyUplinkAPI) ->
None:
30 """Initialize myUplink coordinator."""
40 """Fetch data from the myUplink API."""
41 async
with asyncio.timeout(10):
43 systems = await self.
apiapi.async_get_systems()
45 devices: dict[str, Device] = {}
46 points: dict[str, dict[str, DevicePoint]] = {}
48 device.deviceId
for system
in systems
for device
in system.devices
50 for device_id
in device_ids:
52 api_device_info = await self.
apiapi.async_get_device(device_id)
53 devices[device_id] = api_device_info
56 api_device_points = await self.
apiapi.async_get_device_points(device_id)
57 point_info: dict[str, DevicePoint] = {}
58 for point
in api_device_points:
59 point_info[point.parameter_id] = point
61 points[device_id] = point_info
64 systems=systems, devices=devices, points=points, time=datetime.now()
CoordinatorData _async_update_data(self)
None __init__(self, HomeAssistant hass, MyUplinkAPI api)