1 """Support for Xiaomi Gateways."""
3 from datetime
import timedelta
15 from .const
import DOMAIN
17 _LOGGER = logging.getLogger(__name__)
23 """Representation a base Xiaomi device."""
25 _attr_should_poll =
False
27 def __init__(self, device, device_type, xiaomi_hub, config_entry):
28 """Initialize the Xiaomi device."""
31 self.
_sid_sid = device[
"sid"]
34 self.
_name_name = f
"{device_type}_{self._sid}"
42 self.
parse_dataparse_data(device[
"data"], device[
"raw_data"])
45 if hasattr(self,
"_data_key")
and self._data_key:
46 self.
_unique_id_unique_id = f
"{self._data_key}{self._sid}"
48 self.
_unique_id_unique_id = f
"{self._type}{self._sid}"
61 """Start unavailability tracking."""
67 """Return the name of the device."""
68 return self.
_name_name
72 """Return a unique ID."""
77 """Return the device id of the Xiaomi Aqara device."""
82 """Return the device info of the Xiaomi Aqara device."""
85 identifiers={(DOMAIN, self.
_device_id_device_id)},
86 connections={(dr.CONNECTION_NETWORK_MAC, self.
_device_id_device_id)},
91 connections={(dr.CONNECTION_ZIGBEE, self.
_device_id_device_id)},
92 identifiers={(DOMAIN, self.
_device_id_device_id)},
93 manufacturer=
"Xiaomi Aqara",
104 """Return True if entity is available."""
109 """Return the state attributes."""
114 """Set state to UNAVAILABLE."""
131 def push_data(self, data: dict[str, Any], raw_data: dict[Any, Any]) ->
None:
132 """Push from Hub running in another thread."""
133 self.
hasshass.loop.call_soon_threadsafe(self.
async_push_dataasync_push_data, data, raw_data)
137 """Push from Hub handled in the event loop."""
138 _LOGGER.debug(
"PUSH >> %s: %s", self, data)
140 is_data = self.
parse_dataparse_data(data, raw_data)
142 if is_data
or is_voltage
or was_unavailable:
146 """Parse battery level data sent by gateway."""
147 if "voltage" in data:
148 voltage_key =
"voltage"
149 elif "battery_voltage" in data:
150 voltage_key =
"battery_voltage"
156 voltage = data[voltage_key]
158 voltage =
min(voltage, max_volt)
159 voltage =
max(voltage, min_volt)
160 percent = ((voltage - min_volt) / (max_volt - min_volt)) * 100
165 """Parse data sent by gateway."""
166 raise NotImplementedError
def __init__(self, device, device_type, xiaomi_hub, config_entry)
DeviceInfo device_info(self)
_remove_unavailability_tracker
def async_added_to_hass(self)
None push_data(self, dict[str, Any] data, dict[Any, Any] raw_data)
def _async_set_unavailable(self, now)
def parse_data(self, data, raw_data)
def _async_track_unavailable(self)
def parse_voltage(self, data)
def extra_state_attributes(self)
None async_push_data(self, dict[str, Any] data, dict[Any, Any] raw_data)
None async_write_ha_state(self)
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)