1 """YoLink DataUpdateCoordinator."""
3 from __future__
import annotations
6 from datetime
import UTC, datetime, timedelta
9 from yolink.device
import YoLinkDevice
10 from yolink.exception
import YoLinkAuthFailError, YoLinkClientError
16 from .const
import ATTR_DEVICE_STATE, DOMAIN, YOLINK_OFFLINE_TIME
18 _LOGGER = logging.getLogger(__name__)
22 """YoLink DataUpdateCoordinator."""
28 paired_device: YoLinkDevice |
None =
None,
30 """Init YoLink DataUpdateCoordinator.
32 fetch state every 30 minutes base on yolink device heartbeat interval
33 data is None before the first successful update, but we need to use
37 hass, _LOGGER, name=DOMAIN, update_interval=
timedelta(minutes=30)
44 """Fetch device state."""
46 async
with asyncio.timeout(10):
47 device_state_resp = await self.
devicedevice.fetch_state()
48 device_state = device_state_resp.data.get(ATTR_DEVICE_STATE)
49 device_reporttime = device_state_resp.data.get(
"reportAt")
50 if device_reporttime
is not None:
52 datetime.now(tz=UTC).replace(tzinfo=
None)
53 - datetime.strptime(device_reporttime,
"%Y-%m-%dT%H:%M:%S.%fZ")
55 self.
dev_onlinedev_online = rpt_time_delta < YOLINK_OFFLINE_TIME
56 if self.
paired_devicepaired_device
is not None and device_state
is not None:
57 paried_device_state_resp = await self.
paired_devicepaired_device.fetch_state()
58 paried_device_state = paried_device_state_resp.data.get(
62 paried_device_state
is not None
63 and ATTR_DEVICE_STATE
in paried_device_state
65 device_state[ATTR_DEVICE_STATE] = paried_device_state[
68 except YoLinkAuthFailError
as yl_auth_err:
69 raise ConfigEntryAuthFailed
from yl_auth_err
70 except YoLinkClientError
as yl_client_err:
71 raise UpdateFailed
from yl_client_err
72 if device_state
is not None:
dict _async_update_data(self)
None __init__(self, HomeAssistant hass, YoLinkDevice device, YoLinkDevice|None paired_device=None)