1 """DataUpdateCoordinator for the Yale integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
6 from typing
import TYPE_CHECKING, Any
8 from yalesmartalarmclient
import YaleLock
9 from yalesmartalarmclient.client
import YaleSmartAlarmClient
10 from yalesmartalarmclient.exceptions
import AuthenticationError
18 from .const
import DEFAULT_SCAN_INTERVAL, DOMAIN, LOGGER, YALE_BASE_ERRORS
22 """A Yale Data Update Coordinator."""
24 yale: YaleSmartAlarmClient
26 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) ->
None:
27 """Initialize the Yale hub."""
33 update_interval=
timedelta(seconds=DEFAULT_SCAN_INTERVAL),
36 self.
lockslocks: list[YaleLock] = []
39 """Set up connection to Yale."""
41 self.
yaleyale = await self.
hasshass.async_add_executor_job(
43 self.
entryentry.data[CONF_USERNAME],
44 self.
entryentry.data[CONF_PASSWORD],
46 self.
lockslocks = await self.
hasshass.async_add_executor_job(self.
yaleyale.get_locks)
47 except AuthenticationError
as error:
48 raise ConfigEntryAuthFailed
from error
49 except YALE_BASE_ERRORS
as error:
50 raise UpdateFailed
from error
53 """Fetch data from Yale."""
55 updates = await self.
hasshass.async_add_executor_job(self.
get_updatesget_updates)
60 for device
in updates[
"cycle"][
"device_status"]:
61 state = device[
"status1"]
62 if device[
"type"] ==
"device_type.door_contact":
63 device[
"_battery"] =
False
64 if "device_status.low_battery" in state:
65 device[
"_battery"] =
True
66 if "device_status.dc_close" in state:
67 device[
"_state"] =
"closed"
68 door_windows.append(device)
70 if "device_status.dc_open" in state:
71 device[
"_state"] =
"open"
72 door_windows.append(device)
74 device[
"_state"] =
"unavailable"
75 door_windows.append(device)
77 if device[
"type"] ==
"device_type.temperature_sensor":
78 temp_sensors.append(device)
81 contact[
"address"]: contact[
"_state"]
for contact
in door_windows
83 _sensor_battery_map = {
84 f
"{contact["address
"]}-battery": contact[
"_battery"]
85 for contact
in door_windows
87 _temp_map = {temp[
"address"]: temp[
"status_temp"]
for temp
in temp_sensors}
90 "alarm": updates[
"arm_status"],
91 "door_windows": door_windows,
92 "temp_sensors": temp_sensors,
93 "status": updates[
"status"],
94 "online": updates[
"online"],
95 "sensor_map": _sensor_map,
96 "sensor_battery_map": _sensor_battery_map,
97 "temp_map": _temp_map,
98 "panel_info": updates[
"panel_info"],
102 """Fetch data from Yale."""
104 arm_status = self.
yaleyale.get_armed_status()
105 data = self.
yaleyale.get_information()
108 for device
in data.cycle[
"data"][
"device_status"]:
109 if device[
"type"] == YaleLock.DEVICE_TYPE:
110 for lock
in self.
lockslocks:
111 if lock.name == device[
"name"]:
113 except AuthenticationError
as error:
114 raise ConfigEntryAuthFailed
from error
115 except YALE_BASE_ERRORS
as error:
116 raise UpdateFailed
from error
118 cycle = data.cycle[
"data"]
if data.cycle
else None
119 status = data.status[
"data"]
if data.status
else None
120 online = data.online[
"data"]
if data.online
else None
121 panel_info = data.panel_info[
"data"]
if data.panel_info
else None
124 "arm_status": arm_status,
128 "panel_info": panel_info,
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
dict[str, Any] get_updates(self)