Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for myUplink."""
2 
3 import asyncio.timeouts
4 from dataclasses import dataclass
5 from datetime import datetime, timedelta
6 import logging
7 
8 from myuplink import Device, DevicePoint, MyUplinkAPI, System
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
12 
13 _LOGGER = logging.getLogger(__name__)
14 
15 
16 @dataclass
18  """Represent coordinator data."""
19 
20  systems: list[System]
21  devices: dict[str, Device]
22  points: dict[str, dict[str, DevicePoint]]
23  time: datetime
24 
25 
27  """Coordinator for myUplink data."""
28 
29  def __init__(self, hass: HomeAssistant, api: MyUplinkAPI) -> None:
30  """Initialize myUplink coordinator."""
31  super().__init__(
32  hass,
33  _LOGGER,
34  name="myuplink",
35  update_interval=timedelta(seconds=60),
36  )
37  self.apiapi = api
38 
39  async def _async_update_data(self) -> CoordinatorData:
40  """Fetch data from the myUplink API."""
41  async with asyncio.timeout(10):
42  # Get systems
43  systems = await self.apiapi.async_get_systems()
44 
45  devices: dict[str, Device] = {}
46  points: dict[str, dict[str, DevicePoint]] = {}
47  device_ids = [
48  device.deviceId for system in systems for device in system.devices
49  ]
50  for device_id in device_ids:
51  # Get device info
52  api_device_info = await self.apiapi.async_get_device(device_id)
53  devices[device_id] = api_device_info
54 
55  # Get device points (data)
56  api_device_points = await self.apiapi.async_get_device_points(device_id)
57  point_info: dict[str, DevicePoint] = {}
58  for point in api_device_points:
59  point_info[point.parameter_id] = point
60 
61  points[device_id] = point_info
62 
63  return CoordinatorData(
64  systems=systems, devices=devices, points=points, time=datetime.now()
65  )