1 """Support for exposing NX584 elements as sensors."""
3 from __future__
import annotations
9 from nx584
import client
as nx584_client
11 import voluptuous
as vol
14 DEVICE_CLASSES_SCHEMA
as BINARY_SENSOR_DEVICE_CLASSES_SCHEMA,
15 PLATFORM_SCHEMA
as BINARY_SENSOR_PLATFORM_SCHEMA,
16 BinarySensorDeviceClass,
25 _LOGGER = logging.getLogger(__name__)
27 CONF_EXCLUDE_ZONES =
"exclude_zones"
28 CONF_ZONE_TYPES =
"zone_types"
30 DEFAULT_HOST =
"localhost"
34 ZONE_TYPES_SCHEMA = vol.Schema({cv.positive_int: BINARY_SENSOR_DEVICE_CLASSES_SCHEMA})
36 PLATFORM_SCHEMA = BINARY_SENSOR_PLATFORM_SCHEMA.extend(
38 vol.Optional(CONF_EXCLUDE_ZONES, default=[]): vol.All(
39 cv.ensure_list, [cv.positive_int]
41 vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
42 vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
43 vol.Optional(CONF_ZONE_TYPES, default={}): ZONE_TYPES_SCHEMA,
51 add_entities: AddEntitiesCallback,
52 discovery_info: DiscoveryInfoType |
None =
None,
54 """Set up the NX584 binary sensor platform."""
56 host = config[CONF_HOST]
57 port = config[CONF_PORT]
58 exclude = config[CONF_EXCLUDE_ZONES]
59 zone_types = config[CONF_ZONE_TYPES]
62 client = nx584_client.Client(f
"http://{host}:{port}")
63 zones = client.list_zones()
64 except requests.exceptions.ConnectionError
as ex:
65 _LOGGER.error(
"Unable to connect to NX584: %s",
str(ex))
68 version = [
int(v)
for v
in client.get_version().split(
".")]
70 _LOGGER.error(
"NX584 is too old to use for sensors (>=0.2 required)")
75 zone, zone_types.get(zone[
"number"], BinarySensorDeviceClass.OPENING)
78 if zone[
"number"]
not in exclude
85 _LOGGER.warning(
"No zones found on NX584")
89 """Representation of a NX584 zone as a sensor."""
91 _attr_should_poll =
False
94 """Initialize the nx594 binary sensor."""
100 """Return the class of this sensor, from DEVICE_CLASSES."""
105 """Return the name of the binary sensor."""
106 return self.
_zone_zone[
"name"]
110 """Return true if the binary sensor is on."""
112 return self.
_zone_zone[
"state"]
116 """Return the state attributes."""
118 "zone_number": self.
_zone_zone[
"number"],
119 "bypassed": self.
_zone_zone.
get(
"bypassed",
False),
124 """Event listener thread to process NX584 events."""
127 """Initialize NX584 watcher thread."""
137 zone_sensor._zone[
"state"] = event[
"zone_state"]
138 zone_sensor.schedule_update_ha_state()
142 if event.get(
"type") ==
"zone_status":
146 """Throw away any existing events so we don't replay history."""
147 self.
_client_client.get_events()
149 if events := self.
_client_client.get_events():
153 """Run the watcher."""
157 except requests.exceptions.ConnectionError:
158 _LOGGER.error(
"Failed to reach NX584 server")
def _process_events(self, events)
def __init__(self, client, zone_sensors)
def _process_zone_event(self, event)
def extra_state_attributes(self)
def __init__(self, zone, zone_type)
None add_entities(AsusWrtRouter router, AddEntitiesCallback async_add_entities, set[str] tracked)
web.Response get(self, web.Request request, str config_key)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)