1 """DataUpdateCoordinator for iotty."""
3 from __future__
import annotations
5 from dataclasses
import dataclass
6 from datetime
import timedelta
9 from iottycloud.device
import Device
10 from iottycloud.shutter
import Shutter
11 from iottycloud.verbs
import OPEN_PERCENTAGE, RESULT, STATUS
21 from .const
import DOMAIN
23 _LOGGER = logging.getLogger(__name__)
30 """iotty data stored in the DataUpdateCoordinator."""
36 """Class to manage fetching Iotty data."""
38 config_entry: ConfigEntry
39 _entities: dict[str, Entity]
40 _devices: list[Device]
41 _device_registry: dr.DeviceRegistry
44 self, hass: HomeAssistant, entry: ConfigEntry, session: OAuth2Session
46 """Initialize the coordinator."""
47 _LOGGER.debug(
"Initializing iotty data update coordinator")
52 name=f
"{DOMAIN}_coordinator",
53 update_interval=UPDATE_INTERVAL,
60 hass, aiohttp_client.async_get_clientsession(hass), session
66 _LOGGER.debug(
"Fetching devices list from iottyCloud")
68 _LOGGER.debug(
"There are %d devices", len(self.
_devices_devices))
71 """Fetch data from iottyCloud device."""
72 _LOGGER.debug(
"Fetching devices status from iottyCloud")
74 current_devices = await self.
iottyiotty.get_devices()
79 if not any(x.device_id == d.device_id
for x
in current_devices)
82 for removed_device
in removed_devices:
84 {(DOMAIN, removed_device.device_id)}
86 if device_to_remove
is not None:
87 self.
_device_registry_device_registry.async_remove_device(device_to_remove.id)
89 self.
_devices_devices = current_devices
93 json = res.get(RESULT, {})
95 not isinstance(res, dict)
97 or not isinstance(json := res[RESULT], dict)
98 or not (status := json.get(STATUS))
100 _LOGGER.warning(
"Unable to read status for device %s", device.device_id)
103 "Retrieved status: '%s' for device %s", status, device.device_id
105 device.update_status(status)
106 if isinstance(device, Shutter)
and isinstance(
107 percentage := json.get(OPEN_PERCENTAGE), int
109 device.update_percentage(percentage)
None __init__(self, HomeAssistant hass, ConfigEntry entry, OAuth2Session session)
IottyData _async_update_data(self)
def get_status(hass, host, port)