Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for RFXtrx devices."""
2 
3 from __future__ import annotations
4 
5 import binascii
6 from collections.abc import Callable, Mapping
7 import copy
8 import logging
9 from typing import Any, NamedTuple, cast
10 
11 import RFXtrx as rfxtrxmod
12 import voluptuous as vol
13 
14 from homeassistant.config_entries import ConfigEntry
15 from homeassistant.const import (
16  ATTR_DEVICE_ID,
17  CONF_DEVICE,
18  CONF_DEVICE_ID,
19  CONF_DEVICES,
20  CONF_HOST,
21  CONF_PORT,
22  EVENT_HOMEASSISTANT_STOP,
23  Platform,
24 )
25 from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
26 from homeassistant.exceptions import ConfigEntryNotReady
27 from homeassistant.helpers import config_validation as cv, device_registry as dr
28 from homeassistant.helpers.device_registry import EventDeviceRegistryUpdatedData
30  async_dispatcher_connect,
31  async_dispatcher_send,
32 )
33 from homeassistant.helpers.entity import Entity
34 from homeassistant.helpers.entity_platform import AddEntitiesCallback
35 
36 from .const import (
37  ATTR_EVENT,
38  CONF_AUTOMATIC_ADD,
39  CONF_DATA_BITS,
40  CONF_PROTOCOLS,
41  DATA_RFXOBJECT,
42  DEVICE_PACKET_TYPE_LIGHTING4,
43  DOMAIN,
44  EVENT_RFXTRX_EVENT,
45  SERVICE_SEND,
46  SIGNAL_EVENT,
47 )
48 
49 DEFAULT_OFF_DELAY = 2.0
50 
51 CONNECT_TIMEOUT = 30.0
52 
53 _LOGGER = logging.getLogger(__name__)
54 
55 
56 class DeviceTuple(NamedTuple):
57  """Representation of a device in rfxtrx."""
58 
59  packettype: str
60  subtype: str
61  id_string: str
62 
63 
64 def _bytearray_string(data: Any) -> bytearray:
65  val = cv.string(data)
66  try:
67  return bytearray.fromhex(val)
68  except ValueError as err:
69  raise vol.Invalid(
70  "Data must be a hex string with multiple of two characters"
71  ) from err
72 
73 
74 SERVICE_SEND_SCHEMA = vol.Schema({ATTR_EVENT: _bytearray_string})
75 
76 PLATFORMS = [
77  Platform.BINARY_SENSOR,
78  Platform.COVER,
79  Platform.EVENT,
80  Platform.LIGHT,
81  Platform.SENSOR,
82  Platform.SIREN,
83  Platform.SWITCH,
84 ]
85 
86 
87 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
88  """Set up the RFXtrx component."""
89  hass.data.setdefault(DOMAIN, {})
90 
91  await async_setup_internal(hass, entry)
92  await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
93 
94  return True
95 
96 
97 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
98  """Unload RFXtrx component."""
99  if not await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
100  return False
101 
102  hass.services.async_remove(DOMAIN, SERVICE_SEND)
103 
104  rfx_object = hass.data[DOMAIN][DATA_RFXOBJECT]
105  await hass.async_add_executor_job(rfx_object.close_connection)
106 
107  hass.data.pop(DOMAIN)
108 
109  return True
110 
111 
113  config: Mapping[str, Any], event_callback: Callable[[rfxtrxmod.RFXtrxEvent], None]
114 ) -> rfxtrxmod.Connect:
115  """Construct a rfx object based on config."""
116 
117  modes = config.get(CONF_PROTOCOLS)
118 
119  if modes:
120  _LOGGER.debug("Using modes: %s", ",".join(modes))
121  else:
122  _LOGGER.debug("No modes defined, using device configuration")
123 
124  if config[CONF_PORT] is not None:
125  # If port is set then we create a TCP connection
126  transport = rfxtrxmod.PyNetworkTransport((config[CONF_HOST], config[CONF_PORT]))
127  else:
128  transport = rfxtrxmod.PySerialTransport(config[CONF_DEVICE])
129 
130  rfx = rfxtrxmod.Connect(
131  transport,
132  event_callback,
133  modes=modes,
134  )
135 
136  try:
137  rfx.connect(CONNECT_TIMEOUT)
138  except TimeoutError as exc:
139  raise ConfigEntryNotReady("Timeout on connect") from exc
140  except rfxtrxmod.RFXtrxTransportError as exc:
141  raise ConfigEntryNotReady(str(exc)) from exc
142 
143  return rfx
144 
145 
147  devices: dict[str, dict[str, Any]],
148 ) -> dict[DeviceTuple, dict[str, Any]]:
149  """Get a lookup structure for devices."""
150  lookup = {}
151  for event_code, event_config in devices.items():
152  if (event := get_rfx_object(event_code)) is None:
153  continue
154  device_id = get_device_id(
155  event.device, data_bits=event_config.get(CONF_DATA_BITS)
156  )
157  lookup[device_id] = event_config
158  return lookup
159 
160 
161 async def async_setup_internal(hass: HomeAssistant, entry: ConfigEntry) -> None:
162  """Set up the RFXtrx component."""
163  config = entry.data
164 
165  # Setup some per device config
166  devices = _get_device_lookup(config[CONF_DEVICES])
167  pt2262_devices: set[str] = set()
168 
169  device_registry = dr.async_get(hass)
170 
171  # Declare the Handle event
172  @callback
173  def async_handle_receive(event: rfxtrxmod.RFXtrxEvent) -> None:
174  """Handle received messages from RFXtrx gateway."""
175 
176  if isinstance(event, rfxtrxmod.ConnectionLost):
177  _LOGGER.warning("Connection was lost, triggering reload")
178  hass.async_create_task(
179  hass.config_entries.async_reload(entry.entry_id),
180  f"config entry reload {entry.title} {entry.domain} {entry.entry_id}",
181  )
182  return
183 
184  if not event.device or not event.device.id_string:
185  return
186 
187  event_data = {
188  "packet_type": event.device.packettype,
189  "sub_type": event.device.subtype,
190  "type_string": event.device.type_string,
191  "id_string": event.device.id_string,
192  "data": binascii.hexlify(event.data).decode("ASCII"),
193  "values": getattr(event, "values", None),
194  }
195 
196  _LOGGER.debug("Receive RFXCOM event: %s", event_data)
197 
198  data_bits = get_device_data_bits(event.device, devices)
199  device_id = get_device_id(event.device, data_bits=data_bits)
200 
201  if device_id not in devices:
202  if config[CONF_AUTOMATIC_ADD]:
203  _add_device(event, device_id)
204  else:
205  return
206 
207  if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
208  find_possible_pt2262_device(pt2262_devices, event.device.id_string)
209  pt2262_devices.add(event.device.id_string)
210 
211  device_entry = device_registry.async_get_device(
212  identifiers={(DOMAIN, *device_id)}, # type: ignore[arg-type]
213  )
214  if device_entry:
215  event_data[ATTR_DEVICE_ID] = device_entry.id
216 
217  # Callback to HA registered components.
218  async_dispatcher_send(hass, SIGNAL_EVENT, event, device_id)
219 
220  # Signal event to any other listeners
221  hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data)
222 
223  @callback
224  def _add_device(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
225  """Add a device to config entry."""
226  config = {}
227  config[CONF_DEVICE_ID] = device_id
228 
229  _LOGGER.debug(
230  "Added device (Device ID: %s Class: %s Sub: %s, Event: %s)",
231  event.device.id_string.lower(),
232  event.device.__class__.__name__,
233  event.device.subtype,
234  "".join(f"{x:02x}" for x in event.data),
235  )
236 
237  data = entry.data.copy()
238  data[CONF_DEVICES] = copy.deepcopy(entry.data[CONF_DEVICES])
239  event_code = binascii.hexlify(event.data).decode("ASCII")
240  data[CONF_DEVICES][event_code] = config
241  hass.config_entries.async_update_entry(entry=entry, data=data)
242  devices[device_id] = config
243 
244  @callback
245  def _remove_device(device_id: DeviceTuple) -> None:
246  data = {
247  **entry.data,
248  CONF_DEVICES: {
249  packet_id: entity_info
250  for packet_id, entity_info in entry.data[CONF_DEVICES].items()
251  if tuple(entity_info.get(CONF_DEVICE_ID)) != device_id
252  },
253  }
254  hass.config_entries.async_update_entry(entry=entry, data=data)
255  devices.pop(device_id)
256 
257  @callback
258  def _updated_device(event: Event[EventDeviceRegistryUpdatedData]) -> None:
259  if event.data["action"] != "remove":
260  return
261  device_entry = device_registry.deleted_devices[event.data["device_id"]]
262  if entry.entry_id not in device_entry.config_entries:
263  return
264  device_id = get_device_tuple_from_identifiers(device_entry.identifiers)
265  if device_id:
266  _remove_device(device_id)
267 
268  # Initialize library
269  rfx_object = await hass.async_add_executor_job(
270  _create_rfx, config, lambda event: hass.add_job(async_handle_receive, event)
271  )
272 
273  hass.data[DOMAIN][DATA_RFXOBJECT] = rfx_object
274 
275  entry.async_on_unload(
276  hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device)
277  )
278 
279  def _shutdown_rfxtrx(event: Event) -> None:
280  """Close connection with RFXtrx."""
281  rfx_object.close_connection()
282 
283  entry.async_on_unload(
284  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown_rfxtrx)
285  )
286 
287  def send(call: ServiceCall) -> None:
288  event = call.data[ATTR_EVENT]
289  rfx_object.transport.send(event)
290 
291  hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
292 
293 
295  hass: HomeAssistant,
296  config_entry: ConfigEntry,
297  async_add_entities: AddEntitiesCallback,
298  supported: Callable[[rfxtrxmod.RFXtrxEvent], bool],
299  constructor: Callable[
300  [
301  rfxtrxmod.RFXtrxEvent,
302  rfxtrxmod.RFXtrxEvent | None,
303  DeviceTuple,
304  dict[str, Any],
305  ],
306  list[Entity],
307  ],
308 ) -> None:
309  """Set up config entry."""
310  entry_data = config_entry.data
311  device_ids: set[DeviceTuple] = set()
312 
313  # Add entities from config
314  entities = []
315  for packet_id, entity_info in entry_data[CONF_DEVICES].items():
316  if (event := get_rfx_object(packet_id)) is None:
317  _LOGGER.error("Invalid device: %s", packet_id)
318  continue
319  if not supported(event):
320  continue
321 
322  device_id = get_device_id(
323  event.device, data_bits=entity_info.get(CONF_DATA_BITS)
324  )
325  if device_id in device_ids:
326  continue
327  device_ids.add(device_id)
328 
329  entities.extend(constructor(event, None, device_id, entity_info))
330 
331  async_add_entities(entities)
332 
333  # If automatic add is on, hookup listener
334  if entry_data[CONF_AUTOMATIC_ADD]:
335 
336  @callback
337  def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
338  """Handle light updates from the RFXtrx gateway."""
339  if not supported(event):
340  return
341 
342  if device_id in device_ids:
343  return
344  device_ids.add(device_id)
345  async_add_entities(constructor(event, event, device_id, {}))
346 
347  config_entry.async_on_unload(
348  async_dispatcher_connect(hass, SIGNAL_EVENT, _update)
349  )
350 
351 
352 def get_rfx_object(packetid: str) -> rfxtrxmod.RFXtrxEvent | None:
353  """Return the RFXObject with the packetid."""
354  try:
355  binarypacket = bytearray.fromhex(packetid)
356  except ValueError:
357  return None
358  return rfxtrxmod.RFXtrxTransport.parse(binarypacket)
359 
360 
361 def get_pt2262_deviceid(device_id: str, nb_data_bits: int | None) -> bytes | None:
362  """Extract and return the address bits from a Lighting4/PT2262 packet."""
363  if nb_data_bits is None:
364  return None
365 
366  try:
367  data = bytearray.fromhex(device_id)
368  except ValueError:
369  return None
370  mask = 0xFF & ~((1 << nb_data_bits) - 1)
371 
372  data[len(data) - 1] &= mask
373 
374  return binascii.hexlify(data)
375 
376 
377 def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None:
378  """Extract and return the data bits from a Lighting4/PT2262 packet."""
379  try:
380  data = bytearray.fromhex(device_id)
381  except ValueError:
382  return None
383 
384  mask = 0xFF & ((1 << data_bits) - 1)
385 
386  return hex(data[-1] & mask)
387 
388 
390  device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict[str, Any]]
391 ) -> int | None:
392  """Deduce data bits for device based on a cache of device bits."""
393  data_bits = None
394  if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
395  for device_id, entity_config in devices.items():
396  bits = entity_config.get(CONF_DATA_BITS)
397  if get_device_id(device, bits) == device_id:
398  data_bits = bits
399  break
400  return data_bits
401 
402 
403 def find_possible_pt2262_device(device_ids: set[str], device_id: str) -> str | None:
404  """Look for the device which id matches the given device_id parameter."""
405  for dev_id in device_ids:
406  if len(dev_id) == len(device_id):
407  size = None
408  for i, (char1, char2) in enumerate(zip(dev_id, device_id, strict=False)):
409  if char1 != char2:
410  break
411  size = i
412  if size is not None:
413  size = len(dev_id) - size - 1
414  _LOGGER.debug(
415  (
416  "Found possible device %s for %s "
417  "with the following configuration:\n"
418  "data_bits=%d\n"
419  "command_on=0x%s\n"
420  "command_off=0x%s\n"
421  ),
422  device_id,
423  dev_id,
424  size * 4,
425  dev_id[-size:],
426  device_id[-size:],
427  )
428  return dev_id
429  return None
430 
431 
433  device: rfxtrxmod.RFXtrxDevice, data_bits: int | None = None
434 ) -> DeviceTuple:
435  """Calculate a device id for device."""
436  id_string: str = device.id_string
437  if (
438  data_bits
439  and device.packettype == DEVICE_PACKET_TYPE_LIGHTING4
440  and (masked_id := get_pt2262_deviceid(id_string, data_bits))
441  ):
442  id_string = masked_id.decode("ASCII")
443 
444  return DeviceTuple(f"{device.packettype:x}", f"{device.subtype:x}", id_string)
445 
446 
448  identifiers: set[tuple[str, str]],
449 ) -> DeviceTuple | None:
450  """Calculate the device tuple from a device entry."""
451  identifier = next((x for x in identifiers if x[0] == DOMAIN and len(x) == 4), None)
452  if not identifier:
453  return None
454  # work around legacy identifier, being a multi tuple value
455  identifier2 = cast(tuple[str, str, str, str], identifier)
456  return DeviceTuple(identifier2[1], identifier2[2], identifier2[3])
457 
458 
460  hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
461 ) -> bool:
462  """Remove config entry from a device.
463 
464  The actual cleanup is done in the device registry event
465  """
466  return True
bool async_remove_config_entry_device(HomeAssistant hass, ConfigEntry config_entry, dr.DeviceEntry device_entry)
Definition: __init__.py:461
DeviceTuple|None get_device_tuple_from_identifiers(set[tuple[str, str]] identifiers)
Definition: __init__.py:449
str|None find_possible_pt2262_device(set[str] device_ids, str device_id)
Definition: __init__.py:403
str|None get_pt2262_cmd(str device_id, int data_bits)
Definition: __init__.py:377
DeviceTuple get_device_id(rfxtrxmod.RFXtrxDevice device, int|None data_bits=None)
Definition: __init__.py:434
rfxtrxmod.Connect _create_rfx(Mapping[str, Any] config, Callable[[rfxtrxmod.RFXtrxEvent], None] event_callback)
Definition: __init__.py:114
bytes|None get_pt2262_deviceid(str device_id, int|None nb_data_bits)
Definition: __init__.py:361
None async_setup_internal(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:161
bytearray _bytearray_string(Any data)
Definition: __init__.py:64
int|None get_device_data_bits(rfxtrxmod.RFXtrxDevice device, dict[DeviceTuple, dict[str, Any]] devices)
Definition: __init__.py:391
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:97
None async_setup_platform_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities, Callable[[rfxtrxmod.RFXtrxEvent], bool] supported, Callable[[rfxtrxmod.RFXtrxEvent, rfxtrxmod.RFXtrxEvent|None, DeviceTuple, dict[str, Any],], list[Entity],] constructor)
Definition: __init__.py:308
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:87
dict[DeviceTuple, dict[str, Any]] _get_device_lookup(dict[str, dict[str, Any]] devices)
Definition: __init__.py:148
rfxtrxmod.RFXtrxEvent|None get_rfx_object(str packetid)
Definition: __init__.py:352
None _remove_device(HomeAssistant hass, ConfigEntry config_entry, str mac, TasmotaMQTTClient tasmota_mqtt, DeviceRegistry device_registry)
Definition: __init__.py:117
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193