1 """Offer Z-Wave JS event listening automation trigger."""
3 from __future__
import annotations
5 from collections.abc
import Callable
8 from pydantic
import ValidationError
9 import voluptuous
as vol
10 from zwave_js_server.client
import Client
11 from zwave_js_server.model.controller
import CONTROLLER_EVENT_MODEL_MAP
12 from zwave_js_server.model.driver
import DRIVER_EVENT_MODEL_MAP, Driver
13 from zwave_js_server.model.node
import NODE_EVENT_MODEL_MAP
28 ATTR_PARTIAL_DICT_MATCH,
32 from ..helpers
import (
33 async_get_nodes_from_targets,
35 get_home_and_node_id_from_device_entry,
37 from .trigger_helpers
import async_bypass_dynamic_config_validation
40 PLATFORM_TYPE = f
"{DOMAIN}.{__name__.rsplit('.', maxsplit=1)[-1]}"
44 """Validate that a trigger for a non node event source has a config entry."""
45 if obj[ATTR_EVENT_SOURCE] !=
"node" and ATTR_CONFIG_ENTRY_ID
in obj:
47 raise vol.Invalid(f
"Non node event triggers must contain {ATTR_CONFIG_ENTRY_ID}.")
51 """Validate that a trigger has a valid event name."""
52 event_source = obj[ATTR_EVENT_SOURCE]
53 event_name = obj[ATTR_EVENT]
55 if event_source ==
"controller":
56 vol.In(CONTROLLER_EVENT_MODEL_MAP)(event_name)
57 elif event_source ==
"driver":
58 vol.In(DRIVER_EVENT_MODEL_MAP)(event_name)
60 vol.In(NODE_EVENT_MODEL_MAP)(event_name)
65 """Validate that a trigger has a valid event data."""
67 if ATTR_EVENT_DATA
not in obj:
70 event_source: str = obj[ATTR_EVENT_SOURCE]
71 event_name: str = obj[ATTR_EVENT]
72 event_data: dict = obj[ATTR_EVENT_DATA]
74 if event_source ==
"controller":
75 CONTROLLER_EVENT_MODEL_MAP[event_name](**event_data)
76 elif event_source ==
"driver":
77 DRIVER_EVENT_MODEL_MAP[event_name](**event_data)
79 NODE_EVENT_MODEL_MAP[event_name](**event_data)
80 except ValidationError
as exc:
83 if [error
for error
in exc.errors()
if error[
"type"] !=
"value_error.missing"]:
84 raise vol.MultipleInvalid
from exc
88 TRIGGER_SCHEMA = vol.All(
89 cv.TRIGGER_BASE_SCHEMA.extend(
91 vol.Required(CONF_PLATFORM): PLATFORM_TYPE,
92 vol.Optional(ATTR_CONFIG_ENTRY_ID): str,
93 vol.Optional(ATTR_DEVICE_ID): vol.All(cv.ensure_list, [cv.string]),
94 vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
95 vol.Required(ATTR_EVENT_SOURCE): vol.In([
"controller",
"driver",
"node"]),
96 vol.Required(ATTR_EVENT): cv.string,
97 vol.Optional(ATTR_EVENT_DATA): dict,
98 vol.Optional(ATTR_PARTIAL_DICT_MATCH, default=
False): bool,
104 validate_non_node_event_source,
105 cv.has_at_least_one_key(ATTR_DEVICE_ID, ATTR_ENTITY_ID),
111 hass: HomeAssistant, config: ConfigType
113 """Validate config."""
116 if ATTR_CONFIG_ENTRY_ID
in config:
117 entry_id = config[ATTR_CONFIG_ENTRY_ID]
118 if hass.config_entries.async_get_entry(entry_id)
is None:
119 raise vol.Invalid(f
"Config entry '{entry_id}' not found")
128 f
"No nodes found for given {ATTR_DEVICE_ID}s or {ATTR_ENTITY_ID}s."
137 action: TriggerActionType,
138 trigger_info: TriggerInfo,
140 platform_type: str = PLATFORM_TYPE,
142 """Listen for state changes based on configuration."""
143 dev_reg = dr.async_get(hass)
145 hass, config, dev_reg=dev_reg
148 f
"No nodes found for given {ATTR_DEVICE_ID}s or {ATTR_ENTITY_ID}s."
151 event_source = config[ATTR_EVENT_SOURCE]
152 event_name = config[ATTR_EVENT]
153 event_data_filter = config.get(ATTR_EVENT_DATA, {})
155 unsubs: list[Callable] = []
158 trigger_data = trigger_info[
"trigger_data"]
161 def async_on_event(event_data: dict, device: dr.DeviceEntry |
None =
None) ->
None:
163 for key, val
in event_data_filter.items():
164 if key
not in event_data:
167 config[ATTR_PARTIAL_DICT_MATCH]
168 and isinstance(event_data[key], dict)
169 and isinstance(event_data_filter[key], dict)
171 for key2, val2
in event_data_filter[key].items():
172 if key2
not in event_data[key]
or event_data[key][key2] != val2:
175 if event_data[key] != val:
180 CONF_PLATFORM: platform_type,
181 ATTR_EVENT_SOURCE: event_source,
182 ATTR_EVENT: event_name,
183 ATTR_EVENT_DATA: event_data,
186 primary_desc = f
"Z-Wave JS '{event_source}' event '{event_name}' was emitted"
189 device_name = device.name_by_user
or device.name
190 payload[ATTR_DEVICE_ID] = device.id
192 assert home_and_node_id
193 payload[ATTR_NODE_ID] = home_and_node_id[1]
194 payload[
"description"] = f
"{primary_desc} on {device_name}"
196 payload[
"description"] = primary_desc
198 payload[
"description"] = (
199 f
"{payload['description']} with event data: {event_data}"
202 hass.async_run_hass_job(job, {
"trigger": payload})
206 """Remove state listeners async."""
212 def _create_zwave_listeners() -> None:
213 """Create Z-Wave JS listeners."""
217 drivers: set[Driver] = set()
219 entry_id = config[ATTR_CONFIG_ENTRY_ID]
220 entry = hass.config_entries.async_get_entry(entry_id)
222 client: Client = entry.runtime_data[DATA_CLIENT]
223 driver = client.driver
226 if event_source ==
"controller":
227 unsubs.append(driver.controller.on(event_name, async_on_event))
229 unsubs.append(driver.on(event_name, async_on_event))
232 driver = node.client.driver
233 assert driver
is not None
236 device = dev_reg.async_get_device(identifiers={device_identifier})
240 node.on(event_name, functools.partial(async_on_event, device=device))
245 f
"{DOMAIN}_{driver.controller.home_id}_connected_to_server",
246 _create_zwave_listeners,
248 for driver
in drivers
251 _create_zwave_listeners()
str get_device_id(ServerInfoMessage server_info, MatterEndpoint endpoint)
bool async_bypass_dynamic_config_validation(HomeAssistant hass, str device_id)
tuple[str, int]|None get_home_and_node_id_from_device_entry(dr.DeviceEntry device_entry)
set[ZwaveNode] async_get_nodes_from_targets(HomeAssistant hass, dict[str, Any] val, er.EntityRegistry|None ent_reg=None, dr.DeviceRegistry|None dev_reg=None, logging.Logger logger=LOGGER)
dict validate_non_node_event_source(dict obj)
dict validate_event_name(dict obj)
dict validate_event_data(dict obj)
CALLBACK_TYPE async_attach_trigger(HomeAssistant hass, ConfigType config, TriggerActionType action, TriggerInfo trigger_info, *str platform_type=PLATFORM_TYPE)
ConfigType async_validate_trigger_config(HomeAssistant hass, ConfigType config)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None async_remove(HomeAssistant hass, str intent_type)