Home Assistant Unofficial Reference 2024.12.1
binary_sensor.py
Go to the documentation of this file.
1 """Support for ISY binary sensors."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 from typing import Any
7 
8 from pyisy.constants import (
9  CMD_OFF,
10  CMD_ON,
11  ISY_VALUE_UNKNOWN,
12  PROTO_INSTEON,
13  PROTO_ZWAVE,
14 )
15 from pyisy.helpers import NodeProperty
16 from pyisy.nodes import Group, Node
17 
19  BinarySensorDeviceClass,
20  BinarySensorEntity,
21 )
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.const import STATE_ON, Platform
24 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
25 from homeassistant.helpers.device_registry import DeviceInfo
26 from homeassistant.helpers.entity_platform import AddEntitiesCallback
27 from homeassistant.helpers.event import async_call_later
28 from homeassistant.helpers.restore_state import RestoreEntity
29 
30 from .const import (
31  _LOGGER,
32  BINARY_SENSOR_DEVICE_TYPES_ISY,
33  BINARY_SENSOR_DEVICE_TYPES_ZWAVE,
34  DOMAIN,
35  SUBNODE_CLIMATE_COOL,
36  SUBNODE_CLIMATE_HEAT,
37  SUBNODE_DUSK_DAWN,
38  SUBNODE_HEARTBEAT,
39  SUBNODE_LOW_BATTERY,
40  SUBNODE_MOTION_DISABLED,
41  SUBNODE_NEGATIVE,
42  SUBNODE_TAMPER,
43  TYPE_CATEGORY_CLIMATE,
44  TYPE_INSTEON_MOTION,
45 )
46 from .entity import ISYNodeEntity, ISYProgramEntity
47 from .models import IsyData
48 
49 DEVICE_PARENT_REQUIRED = [
50  BinarySensorDeviceClass.OPENING,
51  BinarySensorDeviceClass.MOISTURE,
52  BinarySensorDeviceClass.MOTION,
53 ]
54 
55 
57  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
58 ) -> None:
59  """Set up the ISY binary sensor platform."""
60  entities: list[
61  ISYInsteonBinarySensorEntity
62  | ISYBinarySensorEntity
63  | ISYBinarySensorHeartbeat
64  | ISYBinarySensorProgramEntity
65  ] = []
66  entities_by_address: dict[
67  str,
68  ISYInsteonBinarySensorEntity
69  | ISYBinarySensorEntity
70  | ISYBinarySensorHeartbeat
71  | ISYBinarySensorProgramEntity,
72  ] = {}
73  child_nodes: list[
74  tuple[Node, BinarySensorDeviceClass | None, str | None, DeviceInfo | None]
75  ] = []
76  entity: (
77  ISYInsteonBinarySensorEntity
78  | ISYBinarySensorEntity
79  | ISYBinarySensorHeartbeat
80  | ISYBinarySensorProgramEntity
81  )
82 
83  isy_data: IsyData = hass.data[DOMAIN][entry.entry_id]
84  devices: dict[str, DeviceInfo] = isy_data.devices
85  for node in isy_data.nodes[Platform.BINARY_SENSOR]:
86  assert isinstance(node, Node)
87  device_info = devices.get(node.primary_node)
88  device_class, device_type = _detect_device_type_and_class(node)
89  if node.protocol == PROTO_INSTEON:
90  if node.parent_node is not None:
91  # We'll process the Insteon child nodes last, to ensure all parent
92  # nodes have been processed
93  child_nodes.append((node, device_class, device_type, device_info))
94  continue
96  node, device_class, device_info=device_info
97  )
98  else:
99  entity = ISYBinarySensorEntity(node, device_class, device_info=device_info)
100  entities.append(entity)
101  entities_by_address[node.address] = entity
102 
103  # Handle some special child node cases for Insteon Devices
104  for node, device_class, device_type, device_info in child_nodes:
105  subnode_id = int(node.address.split(" ")[-1], 16)
106  # Handle Insteon Thermostats
107  if device_type is not None and device_type.startswith(TYPE_CATEGORY_CLIMATE):
108  if subnode_id == SUBNODE_CLIMATE_COOL:
109  # Subnode 2 is the "Cool Control" sensor
110  # It never reports its state until first use is
111  # detected after an ISY Restart, so we assume it's off.
112  # As soon as the ISY Event Stream connects if it has a
113  # valid state, it will be set.
115  node, BinarySensorDeviceClass.COLD, False, device_info=device_info
116  )
117  entities.append(entity)
118  elif subnode_id == SUBNODE_CLIMATE_HEAT:
119  # Subnode 3 is the "Heat Control" sensor
121  node, BinarySensorDeviceClass.HEAT, False, device_info=device_info
122  )
123  entities.append(entity)
124  continue
125 
126  if device_class in DEVICE_PARENT_REQUIRED:
127  parent_entity = entities_by_address.get(node.parent_node.address)
128  if not parent_entity:
129  _LOGGER.error(
130  (
131  "Node %s has a parent node %s, but no device "
132  "was created for the parent. Skipping"
133  ),
134  node.address,
135  node.parent_node,
136  )
137  continue
138 
139  if device_class in (
140  BinarySensorDeviceClass.OPENING,
141  BinarySensorDeviceClass.MOISTURE,
142  ):
143  # These sensors use an optional "negative" subnode 2 to
144  # snag all state changes
145  if subnode_id == SUBNODE_NEGATIVE:
146  assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
147  parent_entity.add_negative_node(node)
148  elif subnode_id == SUBNODE_HEARTBEAT:
149  assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
150  # Subnode 4 is the heartbeat node, which we will
151  # represent as a separate binary_sensor
152  entity = ISYBinarySensorHeartbeat(
153  node, parent_entity, device_info=device_info
154  )
155  parent_entity.add_heartbeat_device(entity)
156  entities.append(entity)
157  continue
158  if (
159  device_class == BinarySensorDeviceClass.MOTION
160  and device_type is not None
161  and any(device_type.startswith(t) for t in TYPE_INSTEON_MOTION)
162  ):
163  # Special cases for Insteon Motion Sensors I & II:
164  # Some subnodes never report status until activated, so
165  # the initial state is forced "OFF"/"NORMAL" if the
166  # parent device has a valid state. This is corrected
167  # upon connection to the ISY event stream if subnode has a valid state.
168  assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
169  initial_state = None if parent_entity.state is None else False
170  if subnode_id == SUBNODE_DUSK_DAWN:
171  # Subnode 2 is the Dusk/Dawn sensor
173  node, BinarySensorDeviceClass.LIGHT, device_info=device_info
174  )
175  entities.append(entity)
176  continue
177  if subnode_id == SUBNODE_LOW_BATTERY:
178  # Subnode 3 is the low battery node
180  node,
181  BinarySensorDeviceClass.BATTERY,
182  initial_state,
183  device_info=device_info,
184  )
185  entities.append(entity)
186  continue
187  if subnode_id in SUBNODE_TAMPER:
188  # Tamper Sub-node for MS II. Sometimes reported as "A" sometimes
189  # reported as "10", which translate from Hex to 10 and 16 resp.
191  node,
192  BinarySensorDeviceClass.PROBLEM,
193  initial_state,
194  device_info=device_info,
195  )
196  entities.append(entity)
197  continue
198  if subnode_id in SUBNODE_MOTION_DISABLED:
199  # Motion Disabled Sub-node for MS II ("D" or "13")
200  entity = ISYInsteonBinarySensorEntity(node, device_info=device_info)
201  entities.append(entity)
202  continue
203 
204  # We don't yet have any special logic for other sensor
205  # types, so add the nodes as individual devices
206  entity = ISYBinarySensorEntity(
207  node, force_device_class=device_class, device_info=device_info
208  )
209  entities.append(entity)
210 
211  for name, status, _ in isy_data.programs[Platform.BINARY_SENSOR]:
212  entities.append(ISYBinarySensorProgramEntity(name, status))
213 
214  async_add_entities(entities)
215 
216 
218  node: Group | Node,
219 ) -> tuple[BinarySensorDeviceClass | None, str | None]:
220  try:
221  device_type = node.type
222  except AttributeError:
223  # The type attribute didn't exist in the ISY's API response
224  return (None, None)
225 
226  # Z-Wave Devices:
227  if node.protocol == PROTO_ZWAVE:
228  device_type = f"Z{node.zwave_props.category}"
229  for device_class, values in BINARY_SENSOR_DEVICE_TYPES_ZWAVE.items():
230  if node.zwave_props.category in values:
231  return device_class, device_type
232  return (None, device_type)
233 
234  # Other devices (incl Insteon.)
235  for device_class, values in BINARY_SENSOR_DEVICE_TYPES_ISY.items():
236  if any(device_type.startswith(t) for t in values):
237  return device_class, device_type
238  return (None, device_type)
239 
240 
242  """Representation of a basic ISY binary sensor device."""
243 
244  def __init__(
245  self,
246  node: Node,
247  force_device_class: BinarySensorDeviceClass | None = None,
248  unknown_state: bool | None = None,
249  device_info: DeviceInfo | None = None,
250  ) -> None:
251  """Initialize the ISY binary sensor device."""
252  super().__init__(node, device_info=device_info)
253  # This was discovered by parsing the device type code during init
254  self._attr_device_class_attr_device_class = force_device_class
255 
256  @property
257  def is_on(self) -> bool | None:
258  """Get whether the ISY binary sensor device is on."""
259  if self._node_node.status == ISY_VALUE_UNKNOWN:
260  return None
261  return bool(self._node_node.status)
262 
263 
265  """Representation of an ISY Insteon binary sensor device.
266 
267  Often times, a single device is represented by multiple nodes in the ISY,
268  allowing for different nuances in how those devices report their on and
269  off events. This class turns those multiple nodes into a single Home
270  Assistant entity and handles both ways that ISY binary sensors can work.
271  """
272 
273  def __init__(
274  self,
275  node: Node,
276  force_device_class: BinarySensorDeviceClass | None = None,
277  unknown_state: bool | None = None,
278  device_info: DeviceInfo | None = None,
279  ) -> None:
280  """Initialize the ISY binary sensor device."""
281  super().__init__(node, force_device_class, device_info=device_info)
282  self._negative_node_negative_node: Node | None = None
283  self._heartbeat_device_heartbeat_device: ISYBinarySensorHeartbeat | None = None
284  if self._node_node.status == ISY_VALUE_UNKNOWN:
285  self._computed_state_computed_state = unknown_state
286  self._status_was_unknown_status_was_unknown = True
287  else:
288  self._computed_state_computed_state = bool(self._node_node.status)
289  self._status_was_unknown_status_was_unknown = False
290 
291  async def async_added_to_hass(self) -> None:
292  """Subscribe to the node and subnode event emitters."""
293  await super().async_added_to_hass()
294 
295  self._node_node.control_events.subscribe(self._async_positive_node_control_handler_async_positive_node_control_handler)
296 
297  if self._negative_node_negative_node is not None:
298  self._negative_node_negative_node.control_events.subscribe(
299  self._async_negative_node_control_handler_async_negative_node_control_handler
300  )
301 
302  def add_heartbeat_device(self, entity: ISYBinarySensorHeartbeat | None) -> None:
303  """Register a heartbeat device for this sensor.
304 
305  The heartbeat node beats on its own, but we can gain a little
306  reliability by considering any node activity for this sensor
307  to be a heartbeat as well.
308  """
309  self._heartbeat_device_heartbeat_device = entity
310 
311  def _async_heartbeat(self) -> None:
312  """Send a heartbeat to our heartbeat device, if we have one."""
313  if self._heartbeat_device_heartbeat_device is not None:
314  self._heartbeat_device_heartbeat_device.async_heartbeat()
315 
316  def add_negative_node(self, child: Node) -> None:
317  """Add a negative node to this binary sensor device.
318 
319  The negative node is a node that can receive the 'off' events
320  for the sensor, depending on device configuration and type.
321  """
322  self._negative_node_negative_node = child
323 
324  # If the negative node has a value, it means the negative node is
325  # in use for this device. Next we need to check to see if the
326  # negative and positive nodes disagree on the state (both ON or
327  # both OFF).
328  if (
329  self._negative_node_negative_node.status != ISY_VALUE_UNKNOWN
330  and self._negative_node_negative_node.status == self._node_node.status
331  ):
332  # The states disagree, therefore we cannot determine the state
333  # of the sensor until we receive our first ON event.
334  self._computed_state_computed_state = None
335 
336  @callback
337  def _async_negative_node_control_handler(self, event: NodeProperty) -> None:
338  """Handle an "On" control event from the "negative" node."""
339  if event.control == CMD_ON:
340  _LOGGER.debug(
341  "Sensor %s turning Off via the Negative node sending a DON command",
342  self.namename,
343  )
344  self._computed_state_computed_state = False
345  self.async_write_ha_stateasync_write_ha_state()
346  self._async_heartbeat_async_heartbeat()
347 
348  @callback
349  def _async_positive_node_control_handler(self, event: NodeProperty) -> None:
350  """Handle On and Off control event coming from the primary node.
351 
352  Depending on device configuration, sometimes only On events
353  will come to this node, with the negative node representing Off
354  events
355  """
356  if event.control == CMD_ON:
357  _LOGGER.debug(
358  "Sensor %s turning On via the Primary node sending a DON command",
359  self.namename,
360  )
361  self._computed_state_computed_state = True
362  self.async_write_ha_stateasync_write_ha_state()
363  self._async_heartbeat_async_heartbeat()
364  if event.control == CMD_OFF:
365  _LOGGER.debug(
366  "Sensor %s turning Off via the Primary node sending a DOF command",
367  self.namename,
368  )
369  self._computed_state_computed_state = False
370  self.async_write_ha_stateasync_write_ha_state()
371  self._async_heartbeat_async_heartbeat()
372 
373  @callback
374  def async_on_update(self, event: NodeProperty) -> None:
375  """Primary node status updates.
376 
377  We MOSTLY ignore these updates, as we listen directly to the Control
378  events on all nodes for this device. However, there is one edge case:
379  If a leak sensor is unknown, due to a recent reboot of the ISY, the
380  status will get updated to dry upon the first heartbeat. This status
381  update is the only way that a leak sensor's status changes without
382  an accompanying Control event, so we need to watch for it.
383  """
384  if self._status_was_unknown_status_was_unknown and self._computed_state_computed_state is None:
385  self._computed_state_computed_state = bool(self._node_node.status)
386  self._status_was_unknown_status_was_unknown = False
387  self.async_write_ha_stateasync_write_ha_state()
388  self._async_heartbeat_async_heartbeat()
389 
390  @property
391  def is_on(self) -> bool | None:
392  """Get whether the ISY binary sensor device is on.
393 
394  Insteon leak sensors set their primary node to On when the state is
395  DRY, not WET, so we invert the binary state if the user indicates
396  that it is a moisture sensor.
397 
398  Dusk/Dawn sensors set their node to On when DUSK, not light detected,
399  so this is inverted as well.
400  """
401  if self._computed_state_computed_state is None:
402  # Do this first so we don't invert None on moisture or light sensors
403  return None
404 
405  if self.device_classdevice_class in (
406  BinarySensorDeviceClass.LIGHT,
407  BinarySensorDeviceClass.MOISTURE,
408  ):
409  return not self._computed_state_computed_state
410 
411  return self._computed_state_computed_state
412 
413 
415  """Representation of the battery state of an ISY sensor."""
416 
417  _attr_device_class = BinarySensorDeviceClass.BATTERY
418 
419  def __init__(
420  self,
421  node: Node,
422  parent_device: ISYInsteonBinarySensorEntity
423  | ISYBinarySensorEntity
424  | ISYBinarySensorHeartbeat
425  | ISYBinarySensorProgramEntity,
426  device_info: DeviceInfo | None = None,
427  ) -> None:
428  """Initialize the ISY binary sensor device.
429 
430  Computed state is set to UNKNOWN unless the ISY provided a valid
431  state. See notes above regarding ISY Sensor status on ISY restart.
432  If a valid state is provided (either on or off), the computed state in
433  HA is restored to the previous value or defaulted to OFF (Normal).
434  If the heartbeat is not received in 25 hours then the computed state is
435  set to ON (Low Battery).
436  """
437  super().__init__(node, device_info=device_info)
438  self._parent_device_parent_device = parent_device
439  self._heartbeat_timer_heartbeat_timer: CALLBACK_TYPE | None = None
440  self._computed_state_computed_state: bool | None = None
441  if self.statestate is None:
442  self._computed_state_computed_state = False
443 
444  async def async_added_to_hass(self) -> None:
445  """Subscribe to the node and subnode event emitters."""
446  await super().async_added_to_hass()
447 
448  self._node_node.control_events.subscribe(self._heartbeat_node_control_handler_heartbeat_node_control_handler)
449 
450  # Start the timer on boot-up, so we can change from UNKNOWN to OFF
451  self._restart_timer_restart_timer()
452 
453  if (last_state := await self.async_get_last_stateasync_get_last_state()) is not None:
454  # Only restore the state if it was previously ON (Low Battery)
455  if last_state.state == STATE_ON:
456  self._computed_state_computed_state = True
457 
458  def _heartbeat_node_control_handler(self, event: NodeProperty) -> None:
459  """Update the heartbeat timestamp when any ON/OFF event is sent.
460 
461  The ISY uses both DON and DOF commands (alternating) for a heartbeat.
462  """
463  if event.control in (CMD_ON, CMD_OFF):
464  self.async_heartbeatasync_heartbeat()
465 
466  @callback
467  def async_heartbeat(self) -> None:
468  """Mark the device as online, and restart the 25 hour timer.
469 
470  This gets called when the heartbeat node beats, but also when the
471  parent sensor sends any events, as we can trust that to mean the device
472  is online. This mitigates the risk of false positives due to a single
473  missed heartbeat event.
474  """
475  self._computed_state_computed_state = False
476  self._restart_timer_restart_timer()
477  self.async_write_ha_stateasync_write_ha_state()
478 
479  def _restart_timer(self) -> None:
480  """Restart the 25 hour timer."""
481  if self._heartbeat_timer_heartbeat_timer is not None:
482  self._heartbeat_timer_heartbeat_timer()
483  self._heartbeat_timer_heartbeat_timer = None
484 
485  @callback
486  def timer_elapsed(now: datetime) -> None:
487  """Heartbeat missed; set state to ON to indicate dead battery."""
488  self._computed_state_computed_state = True
489  self._heartbeat_timer_heartbeat_timer = None
490  self.async_write_ha_stateasync_write_ha_state()
491 
492  self._heartbeat_timer_heartbeat_timer = async_call_later(
493  self.hasshass, timedelta(hours=25), timer_elapsed
494  )
495 
496  @callback
497  def async_on_update(self, event: object) -> None:
498  """Ignore node status updates.
499 
500  We listen directly to the Control events for this device.
501  """
502 
503  @property
504  def is_on(self) -> bool:
505  """Get whether the ISY binary sensor device is on.
506 
507  Note: This method will return false if the current state is UNKNOWN
508  which occurs after a restart until the first heartbeat or control
509  parent control event is received.
510  """
511  return bool(self._computed_state_computed_state)
512 
513  @property
514  def extra_state_attributes(self) -> dict[str, Any]:
515  """Get the state attributes for the device."""
516  attr = super().extra_state_attributes
517  attr["parent_entity_id"] = self._parent_device_parent_device.entity_id
518  return attr
519 
520 
522  """Representation of an ISY binary sensor program.
523 
524  This does not need all of the subnode logic in the device version of binary
525  sensors.
526  """
527 
528  @property
529  def is_on(self) -> bool:
530  """Get whether the ISY binary sensor device is on."""
531  return bool(self._node_node.status)
None __init__(self, Node node, BinarySensorDeviceClass|None force_device_class=None, bool|None unknown_state=None, DeviceInfo|None device_info=None)
None __init__(self, Node node, ISYInsteonBinarySensorEntity|ISYBinarySensorEntity|ISYBinarySensorHeartbeat|ISYBinarySensorProgramEntity parent_device, DeviceInfo|None device_info=None)
None add_heartbeat_device(self, ISYBinarySensorHeartbeat|None entity)
None __init__(self, Node node, BinarySensorDeviceClass|None force_device_class=None, bool|None unknown_state=None, DeviceInfo|None device_info=None)
str|UndefinedType|None name(self)
Definition: entity.py:738
tuple[BinarySensorDeviceClass|None, str|None] _detect_device_type_and_class(Group|Node node)
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597