Home Assistant Unofficial Reference 2024.12.1
event.py
Go to the documentation of this file.
1 """Offer Z-Wave JS event listening automation trigger."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 import functools
7 
8 from pydantic import ValidationError
9 import voluptuous as vol
10 from zwave_js_server.client import Client
11 from zwave_js_server.model.controller import CONTROLLER_EVENT_MODEL_MAP
12 from zwave_js_server.model.driver import DRIVER_EVENT_MODEL_MAP, Driver
13 from zwave_js_server.model.node import NODE_EVENT_MODEL_MAP
14 
15 from homeassistant.const import ATTR_DEVICE_ID, ATTR_ENTITY_ID, CONF_PLATFORM
16 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
17 from homeassistant.helpers import config_validation as cv, device_registry as dr
18 from homeassistant.helpers.dispatcher import async_dispatcher_connect
19 from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
20 from homeassistant.helpers.typing import ConfigType
21 
22 from ..const import (
23  ATTR_CONFIG_ENTRY_ID,
24  ATTR_EVENT,
25  ATTR_EVENT_DATA,
26  ATTR_EVENT_SOURCE,
27  ATTR_NODE_ID,
28  ATTR_PARTIAL_DICT_MATCH,
29  DATA_CLIENT,
30  DOMAIN,
31 )
32 from ..helpers import (
33  async_get_nodes_from_targets,
34  get_device_id,
35  get_home_and_node_id_from_device_entry,
36 )
37 from .trigger_helpers import async_bypass_dynamic_config_validation
38 
39 # Platform type should be <DOMAIN>.<SUBMODULE_NAME>
40 PLATFORM_TYPE = f"{DOMAIN}.{__name__.rsplit('.', maxsplit=1)[-1]}"
41 
42 
43 def validate_non_node_event_source(obj: dict) -> dict:
44  """Validate that a trigger for a non node event source has a config entry."""
45  if obj[ATTR_EVENT_SOURCE] != "node" and ATTR_CONFIG_ENTRY_ID in obj:
46  return obj
47  raise vol.Invalid(f"Non node event triggers must contain {ATTR_CONFIG_ENTRY_ID}.")
48 
49 
50 def validate_event_name(obj: dict) -> dict:
51  """Validate that a trigger has a valid event name."""
52  event_source = obj[ATTR_EVENT_SOURCE]
53  event_name = obj[ATTR_EVENT]
54  # the keys to the event source's model map are the event names
55  if event_source == "controller":
56  vol.In(CONTROLLER_EVENT_MODEL_MAP)(event_name)
57  elif event_source == "driver":
58  vol.In(DRIVER_EVENT_MODEL_MAP)(event_name)
59  else:
60  vol.In(NODE_EVENT_MODEL_MAP)(event_name)
61  return obj
62 
63 
64 def validate_event_data(obj: dict) -> dict:
65  """Validate that a trigger has a valid event data."""
66  # Return if there's no event data to validate
67  if ATTR_EVENT_DATA not in obj:
68  return obj
69 
70  event_source: str = obj[ATTR_EVENT_SOURCE]
71  event_name: str = obj[ATTR_EVENT]
72  event_data: dict = obj[ATTR_EVENT_DATA]
73  try:
74  if event_source == "controller":
75  CONTROLLER_EVENT_MODEL_MAP[event_name](**event_data)
76  elif event_source == "driver":
77  DRIVER_EVENT_MODEL_MAP[event_name](**event_data)
78  else:
79  NODE_EVENT_MODEL_MAP[event_name](**event_data)
80  except ValidationError as exc:
81  # Filter out required field errors if keys can be missing, and if there are
82  # still errors, raise an exception
83  if [error for error in exc.errors() if error["type"] != "value_error.missing"]:
84  raise vol.MultipleInvalid from exc
85  return obj
86 
87 
88 TRIGGER_SCHEMA = vol.All(
89  cv.TRIGGER_BASE_SCHEMA.extend(
90  {
91  vol.Required(CONF_PLATFORM): PLATFORM_TYPE,
92  vol.Optional(ATTR_CONFIG_ENTRY_ID): str,
93  vol.Optional(ATTR_DEVICE_ID): vol.All(cv.ensure_list, [cv.string]),
94  vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
95  vol.Required(ATTR_EVENT_SOURCE): vol.In(["controller", "driver", "node"]),
96  vol.Required(ATTR_EVENT): cv.string,
97  vol.Optional(ATTR_EVENT_DATA): dict,
98  vol.Optional(ATTR_PARTIAL_DICT_MATCH, default=False): bool,
99  },
100  ),
101  validate_event_name,
102  validate_event_data,
103  vol.Any(
104  validate_non_node_event_source,
105  cv.has_at_least_one_key(ATTR_DEVICE_ID, ATTR_ENTITY_ID),
106  ),
107 )
108 
109 
111  hass: HomeAssistant, config: ConfigType
112 ) -> ConfigType:
113  """Validate config."""
114  config = TRIGGER_SCHEMA(config)
115 
116  if ATTR_CONFIG_ENTRY_ID in config:
117  entry_id = config[ATTR_CONFIG_ENTRY_ID]
118  if hass.config_entries.async_get_entry(entry_id) is None:
119  raise vol.Invalid(f"Config entry '{entry_id}' not found")
120 
122  return config
123 
124  if config[ATTR_EVENT_SOURCE] == "node" and not async_get_nodes_from_targets(
125  hass, config
126  ):
127  raise vol.Invalid(
128  f"No nodes found for given {ATTR_DEVICE_ID}s or {ATTR_ENTITY_ID}s."
129  )
130 
131  return config
132 
133 
135  hass: HomeAssistant,
136  config: ConfigType,
137  action: TriggerActionType,
138  trigger_info: TriggerInfo,
139  *,
140  platform_type: str = PLATFORM_TYPE,
141 ) -> CALLBACK_TYPE:
142  """Listen for state changes based on configuration."""
143  dev_reg = dr.async_get(hass)
144  if config[ATTR_EVENT_SOURCE] == "node" and not async_get_nodes_from_targets(
145  hass, config, dev_reg=dev_reg
146  ):
147  raise ValueError(
148  f"No nodes found for given {ATTR_DEVICE_ID}s or {ATTR_ENTITY_ID}s."
149  )
150 
151  event_source = config[ATTR_EVENT_SOURCE]
152  event_name = config[ATTR_EVENT]
153  event_data_filter = config.get(ATTR_EVENT_DATA, {})
154 
155  unsubs: list[Callable] = []
156  job = HassJob(action)
157 
158  trigger_data = trigger_info["trigger_data"]
159 
160  @callback
161  def async_on_event(event_data: dict, device: dr.DeviceEntry | None = None) -> None:
162  """Handle event."""
163  for key, val in event_data_filter.items():
164  if key not in event_data:
165  return
166  if (
167  config[ATTR_PARTIAL_DICT_MATCH]
168  and isinstance(event_data[key], dict)
169  and isinstance(event_data_filter[key], dict)
170  ):
171  for key2, val2 in event_data_filter[key].items():
172  if key2 not in event_data[key] or event_data[key][key2] != val2:
173  return
174  continue
175  if event_data[key] != val:
176  return
177 
178  payload = {
179  **trigger_data,
180  CONF_PLATFORM: platform_type,
181  ATTR_EVENT_SOURCE: event_source,
182  ATTR_EVENT: event_name,
183  ATTR_EVENT_DATA: event_data,
184  }
185 
186  primary_desc = f"Z-Wave JS '{event_source}' event '{event_name}' was emitted"
187 
188  if device:
189  device_name = device.name_by_user or device.name
190  payload[ATTR_DEVICE_ID] = device.id
191  home_and_node_id = get_home_and_node_id_from_device_entry(device)
192  assert home_and_node_id
193  payload[ATTR_NODE_ID] = home_and_node_id[1]
194  payload["description"] = f"{primary_desc} on {device_name}"
195  else:
196  payload["description"] = primary_desc
197 
198  payload["description"] = (
199  f"{payload['description']} with event data: {event_data}"
200  )
201 
202  hass.async_run_hass_job(job, {"trigger": payload})
203 
204  @callback
205  def async_remove() -> None:
206  """Remove state listeners async."""
207  for unsub in unsubs:
208  unsub()
209  unsubs.clear()
210 
211  @callback
212  def _create_zwave_listeners() -> None:
213  """Create Z-Wave JS listeners."""
214  async_remove()
215  # Nodes list can come from different drivers and we will need to listen to
216  # server connections for all of them.
217  drivers: set[Driver] = set()
218  if not (nodes := async_get_nodes_from_targets(hass, config, dev_reg=dev_reg)):
219  entry_id = config[ATTR_CONFIG_ENTRY_ID]
220  entry = hass.config_entries.async_get_entry(entry_id)
221  assert entry
222  client: Client = entry.runtime_data[DATA_CLIENT]
223  driver = client.driver
224  assert driver
225  drivers.add(driver)
226  if event_source == "controller":
227  unsubs.append(driver.controller.on(event_name, async_on_event))
228  else:
229  unsubs.append(driver.on(event_name, async_on_event))
230 
231  for node in nodes:
232  driver = node.client.driver
233  assert driver is not None # The node comes from the driver.
234  drivers.add(driver)
235  device_identifier = get_device_id(driver, node)
236  device = dev_reg.async_get_device(identifiers={device_identifier})
237  assert device
238  # We need to store the device for the callback
239  unsubs.append(
240  node.on(event_name, functools.partial(async_on_event, device=device))
241  )
242  unsubs.extend(
244  hass,
245  f"{DOMAIN}_{driver.controller.home_id}_connected_to_server",
246  _create_zwave_listeners,
247  )
248  for driver in drivers
249  )
250 
251  _create_zwave_listeners()
252 
253  return async_remove
str get_device_id(ServerInfoMessage server_info, MatterEndpoint endpoint)
Definition: helpers.py:59
bool async_bypass_dynamic_config_validation(HomeAssistant hass, str device_id)
tuple[str, int]|None get_home_and_node_id_from_device_entry(dr.DeviceEntry device_entry)
Definition: helpers.py:231
set[ZwaveNode] async_get_nodes_from_targets(HomeAssistant hass, dict[str, Any] val, er.EntityRegistry|None ent_reg=None, dr.DeviceRegistry|None dev_reg=None, logging.Logger logger=LOGGER)
Definition: helpers.py:369
CALLBACK_TYPE async_attach_trigger(HomeAssistant hass, ConfigType config, TriggerActionType action, TriggerInfo trigger_info, *str platform_type=PLATFORM_TYPE)
Definition: event.py:141
ConfigType async_validate_trigger_config(HomeAssistant hass, ConfigType config)
Definition: event.py:112
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_remove(HomeAssistant hass, str intent_type)
Definition: intent.py:90