1 """Support for RFXtrx devices."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Mapping
9 from typing
import Any, NamedTuple, cast
11 import RFXtrx
as rfxtrxmod
12 import voluptuous
as vol
22 EVENT_HOMEASSISTANT_STOP,
30 async_dispatcher_connect,
31 async_dispatcher_send,
42 DEVICE_PACKET_TYPE_LIGHTING4,
49 DEFAULT_OFF_DELAY = 2.0
51 CONNECT_TIMEOUT = 30.0
53 _LOGGER = logging.getLogger(__name__)
57 """Representation of a device in rfxtrx."""
67 return bytearray.fromhex(val)
68 except ValueError
as err:
70 "Data must be a hex string with multiple of two characters"
74 SERVICE_SEND_SCHEMA = vol.Schema({ATTR_EVENT: _bytearray_string})
77 Platform.BINARY_SENSOR,
88 """Set up the RFXtrx component."""
89 hass.data.setdefault(DOMAIN, {})
92 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
98 """Unload RFXtrx component."""
99 if not await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
102 hass.services.async_remove(DOMAIN, SERVICE_SEND)
104 rfx_object = hass.data[DOMAIN][DATA_RFXOBJECT]
105 await hass.async_add_executor_job(rfx_object.close_connection)
107 hass.data.pop(DOMAIN)
113 config: Mapping[str, Any], event_callback: Callable[[rfxtrxmod.RFXtrxEvent],
None]
114 ) -> rfxtrxmod.Connect:
115 """Construct a rfx object based on config."""
117 modes = config.get(CONF_PROTOCOLS)
120 _LOGGER.debug(
"Using modes: %s",
",".join(modes))
122 _LOGGER.debug(
"No modes defined, using device configuration")
124 if config[CONF_PORT]
is not None:
126 transport = rfxtrxmod.PyNetworkTransport((config[CONF_HOST], config[CONF_PORT]))
128 transport = rfxtrxmod.PySerialTransport(config[CONF_DEVICE])
130 rfx = rfxtrxmod.Connect(
137 rfx.connect(CONNECT_TIMEOUT)
138 except TimeoutError
as exc:
140 except rfxtrxmod.RFXtrxTransportError
as exc:
147 devices: dict[str, dict[str, Any]],
148 ) -> dict[DeviceTuple, dict[str, Any]]:
149 """Get a lookup structure for devices."""
151 for event_code, event_config
in devices.items():
155 event.device, data_bits=event_config.get(CONF_DATA_BITS)
157 lookup[device_id] = event_config
162 """Set up the RFXtrx component."""
167 pt2262_devices: set[str] = set()
169 device_registry = dr.async_get(hass)
173 def async_handle_receive(event: rfxtrxmod.RFXtrxEvent) ->
None:
174 """Handle received messages from RFXtrx gateway."""
176 if isinstance(event, rfxtrxmod.ConnectionLost):
177 _LOGGER.warning(
"Connection was lost, triggering reload")
178 hass.async_create_task(
179 hass.config_entries.async_reload(entry.entry_id),
180 f
"config entry reload {entry.title} {entry.domain} {entry.entry_id}",
184 if not event.device
or not event.device.id_string:
188 "packet_type": event.device.packettype,
189 "sub_type": event.device.subtype,
190 "type_string": event.device.type_string,
191 "id_string": event.device.id_string,
192 "data": binascii.hexlify(event.data).decode(
"ASCII"),
193 "values": getattr(event,
"values",
None),
196 _LOGGER.debug(
"Receive RFXCOM event: %s", event_data)
201 if device_id
not in devices:
202 if config[CONF_AUTOMATIC_ADD]:
203 _add_device(event, device_id)
207 if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
209 pt2262_devices.add(event.device.id_string)
211 device_entry = device_registry.async_get_device(
212 identifiers={(DOMAIN, *device_id)},
215 event_data[ATTR_DEVICE_ID] = device_entry.id
221 hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data)
224 def _add_device(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) ->
None:
225 """Add a device to config entry."""
227 config[CONF_DEVICE_ID] = device_id
230 "Added device (Device ID: %s Class: %s Sub: %s, Event: %s)",
231 event.device.id_string.lower(),
232 event.device.__class__.__name__,
233 event.device.subtype,
234 "".join(f
"{x:02x}" for x
in event.data),
237 data = entry.data.copy()
238 data[CONF_DEVICES] = copy.deepcopy(entry.data[CONF_DEVICES])
239 event_code = binascii.hexlify(event.data).decode(
"ASCII")
240 data[CONF_DEVICES][event_code] = config
241 hass.config_entries.async_update_entry(entry=entry, data=data)
242 devices[device_id] = config
249 packet_id: entity_info
250 for packet_id, entity_info
in entry.data[CONF_DEVICES].items()
251 if tuple(entity_info.get(CONF_DEVICE_ID)) != device_id
254 hass.config_entries.async_update_entry(entry=entry, data=data)
255 devices.pop(device_id)
258 def _updated_device(event: Event[EventDeviceRegistryUpdatedData]) ->
None:
259 if event.data[
"action"] !=
"remove":
261 device_entry = device_registry.deleted_devices[event.data[
"device_id"]]
262 if entry.entry_id
not in device_entry.config_entries:
269 rfx_object = await hass.async_add_executor_job(
270 _create_rfx, config,
lambda event: hass.add_job(async_handle_receive, event)
273 hass.data[DOMAIN][DATA_RFXOBJECT] = rfx_object
275 entry.async_on_unload(
276 hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device)
279 def _shutdown_rfxtrx(event: Event) ->
None:
280 """Close connection with RFXtrx."""
281 rfx_object.close_connection()
283 entry.async_on_unload(
284 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown_rfxtrx)
287 def send(call: ServiceCall) ->
None:
288 event = call.data[ATTR_EVENT]
289 rfx_object.transport.send(event)
291 hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
296 config_entry: ConfigEntry,
297 async_add_entities: AddEntitiesCallback,
298 supported: Callable[[rfxtrxmod.RFXtrxEvent], bool],
299 constructor: Callable[
301 rfxtrxmod.RFXtrxEvent,
302 rfxtrxmod.RFXtrxEvent |
None,
309 """Set up config entry."""
310 entry_data = config_entry.data
311 device_ids: set[DeviceTuple] = set()
315 for packet_id, entity_info
in entry_data[CONF_DEVICES].items():
317 _LOGGER.error(
"Invalid device: %s", packet_id)
323 event.device, data_bits=entity_info.get(CONF_DATA_BITS)
325 if device_id
in device_ids:
327 device_ids.add(device_id)
329 entities.extend(constructor(event,
None, device_id, entity_info))
334 if entry_data[CONF_AUTOMATIC_ADD]:
337 def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) ->
None:
338 """Handle light updates from the RFXtrx gateway."""
342 if device_id
in device_ids:
344 device_ids.add(device_id)
347 config_entry.async_on_unload(
353 """Return the RFXObject with the packetid."""
355 binarypacket = bytearray.fromhex(packetid)
358 return rfxtrxmod.RFXtrxTransport.parse(binarypacket)
362 """Extract and return the address bits from a Lighting4/PT2262 packet."""
363 if nb_data_bits
is None:
367 data = bytearray.fromhex(device_id)
370 mask = 0xFF & ~((1 << nb_data_bits) - 1)
372 data[len(data) - 1] &= mask
374 return binascii.hexlify(data)
378 """Extract and return the data bits from a Lighting4/PT2262 packet."""
380 data = bytearray.fromhex(device_id)
384 mask = 0xFF & ((1 << data_bits) - 1)
386 return hex(data[-1] & mask)
390 device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict[str, Any]]
392 """Deduce data bits for device based on a cache of device bits."""
394 if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
395 for device_id, entity_config
in devices.items():
396 bits = entity_config.get(CONF_DATA_BITS)
404 """Look for the device which id matches the given device_id parameter."""
405 for dev_id
in device_ids:
406 if len(dev_id) == len(device_id):
408 for i, (char1, char2)
in enumerate(zip(dev_id, device_id, strict=
False)):
413 size = len(dev_id) - size - 1
416 "Found possible device %s for %s "
417 "with the following configuration:\n"
433 device: rfxtrxmod.RFXtrxDevice, data_bits: int |
None =
None
435 """Calculate a device id for device."""
436 id_string: str = device.id_string
439 and device.packettype == DEVICE_PACKET_TYPE_LIGHTING4
442 id_string = masked_id.decode(
"ASCII")
444 return DeviceTuple(f
"{device.packettype:x}", f
"{device.subtype:x}", id_string)
448 identifiers: set[tuple[str, str]],
449 ) -> DeviceTuple |
None:
450 """Calculate the device tuple from a device entry."""
451 identifier = next((x
for x
in identifiers
if x[0] == DOMAIN
and len(x) == 4),
None)
455 identifier2 = cast(tuple[str, str, str, str], identifier)
456 return DeviceTuple(identifier2[1], identifier2[2], identifier2[3])
460 hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
462 """Remove config entry from a device.
464 The actual cleanup is done in the device registry event
bool async_remove_config_entry_device(HomeAssistant hass, ConfigEntry config_entry, dr.DeviceEntry device_entry)
DeviceTuple|None get_device_tuple_from_identifiers(set[tuple[str, str]] identifiers)
str|None find_possible_pt2262_device(set[str] device_ids, str device_id)
str|None get_pt2262_cmd(str device_id, int data_bits)
DeviceTuple get_device_id(rfxtrxmod.RFXtrxDevice device, int|None data_bits=None)
rfxtrxmod.Connect _create_rfx(Mapping[str, Any] config, Callable[[rfxtrxmod.RFXtrxEvent], None] event_callback)
bytes|None get_pt2262_deviceid(str device_id, int|None nb_data_bits)
None async_setup_internal(HomeAssistant hass, ConfigEntry entry)
bytearray _bytearray_string(Any data)
int|None get_device_data_bits(rfxtrxmod.RFXtrxDevice device, dict[DeviceTuple, dict[str, Any]] devices)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
None async_setup_platform_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities, Callable[[rfxtrxmod.RFXtrxEvent], bool] supported, Callable[[rfxtrxmod.RFXtrxEvent, rfxtrxmod.RFXtrxEvent|None, DeviceTuple, dict[str, Any],], list[Entity],] constructor)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
dict[DeviceTuple, dict[str, Any]] _get_device_lookup(dict[str, dict[str, Any]] devices)
rfxtrxmod.RFXtrxEvent|None get_rfx_object(str packetid)
None _remove_device(HomeAssistant hass, ConfigEntry config_entry, str mac, TasmotaMQTTClient tasmota_mqtt, DeviceRegistry device_registry)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)