1 """Config flow for RFXCOM RFXtrx integration."""
3 from __future__
import annotations
6 from contextlib
import suppress
10 from typing
import Any, TypedDict, cast
12 import RFXtrx
as rfxtrxmod
14 import serial.tools.list_ports
15 import voluptuous
as vol
36 config_validation
as cv,
37 device_registry
as dr,
38 entity_registry
as er,
47 get_device_tuple_from_identifiers,
50 from .binary_sensor
import supported
as binary_supported
57 CONF_VENETIAN_BLIND_MODE,
58 CONST_VENETIAN_BLIND_MODE_DEFAULT,
59 CONST_VENETIAN_BLIND_MODE_EU,
60 CONST_VENETIAN_BLIND_MODE_US,
61 DEVICE_PACKET_TYPE_LIGHTING4,
64 CONF_EVENT_CODE =
"event_code"
65 CONF_MANUAL_PATH =
"Enter Manually"
67 RECV_MODES = sorted(itertools.chain(*rfxtrxmod.lowlevel.Status.RECMODES))
71 """Dict data representing a device entry."""
73 event_code: str |
None
74 device_id: DeviceTuple
78 """Check if string is one otherwise convert to int."""
81 return int(value, base)
85 """Handle Rfxtrx options."""
87 _device_registry: dr.DeviceRegistry
88 _device_entries: list[dr.DeviceEntry]
91 """Initialize rfxtrx options flow."""
99 self, user_input: dict[str, Any] |
None =
None
100 ) -> ConfigFlowResult:
101 """Manage the options."""
105 self, user_input: dict[str, Any] |
None =
None
106 ) -> ConfigFlowResult:
107 """Prompt for options."""
110 if user_input
is not None:
112 CONF_AUTOMATIC_ADD: user_input[CONF_AUTOMATIC_ADD],
113 CONF_PROTOCOLS: user_input[CONF_PROTOCOLS]
or None,
115 if CONF_DEVICE
in user_input:
116 entry_id = user_input[CONF_DEVICE]
119 event_code = device_data[
"event_code"]
125 if CONF_EVENT_CODE
in user_input:
127 str, user_input[CONF_EVENT_CODE]
133 if selected_device_object
is None:
134 errors[CONF_EVENT_CODE] =
"invalid_event_code"
136 errors[CONF_EVENT_CODE] =
"already_configured_device"
146 device_registry = dr.async_get(self.hass)
147 device_entries = dr.async_entries_for_config_entry(
153 configure_devices = {
154 entry.id: entry.name_by_user
if entry.name_by_user
else entry.name
155 for entry
in device_entries
167 ): cv.multi_select(RECV_MODES),
168 vol.Optional(CONF_EVENT_CODE): str,
169 vol.Optional(CONF_DEVICE): vol.In(configure_devices),
173 step_id=
"prompt_options", data_schema=vol.Schema(options), errors=errors
177 self, user_input: dict[str, Any] |
None =
None
178 ) -> ConfigFlowResult:
179 """Manage device options."""
184 if user_input
is not None:
185 devices: dict[str, dict[str, Any] |
None] = {}
186 device: dict[str, Any]
189 data_bits=user_input.get(CONF_DATA_BITS),
192 if CONF_REPLACE_DEVICE
in user_input:
203 command_on =
none_or_int(user_input.get(CONF_COMMAND_ON), 16)
205 errors[CONF_COMMAND_ON] =
"invalid_input_2262_on"
208 command_off =
none_or_int(user_input.get(CONF_COMMAND_OFF), 16)
210 errors[CONF_COMMAND_OFF] =
"invalid_input_2262_off"
213 off_delay =
none_or_int(user_input.get(CONF_OFF_DELAY), 10)
215 errors[CONF_OFF_DELAY] =
"invalid_input_off_delay"
220 CONF_DEVICE_ID:
list(device_id),
226 device[CONF_OFF_DELAY] = off_delay
227 if user_input.get(CONF_DATA_BITS):
228 device[CONF_DATA_BITS] = user_input[CONF_DATA_BITS]
230 device[CONF_COMMAND_ON] = command_on
232 device[CONF_COMMAND_OFF] = command_off
233 if user_input.get(CONF_VENETIAN_BLIND_MODE):
234 device[CONF_VENETIAN_BLIND_MODE] = user_input[
235 CONF_VENETIAN_BLIND_MODE
246 data_schema: VolDictType = {}
249 off_delay_schema: VolDictType
250 if device_data.get(CONF_OFF_DELAY):
254 description={
"suggested_value": device_data[CONF_OFF_DELAY]},
259 vol.Optional(CONF_OFF_DELAY): str,
261 data_schema.update(off_delay_schema)
265 == DEVICE_PACKET_TYPE_LIGHTING4
270 CONF_DATA_BITS, default=device_data.get(CONF_DATA_BITS, 0)
274 default=hex(device_data.get(CONF_COMMAND_ON, 0)),
278 default=hex(device_data.get(CONF_COMMAND_OFF, 0)),
287 CONF_VENETIAN_BLIND_MODE,
288 default=device_data.get(
289 CONF_VENETIAN_BLIND_MODE, CONST_VENETIAN_BLIND_MODE_DEFAULT
293 CONST_VENETIAN_BLIND_MODE_DEFAULT,
294 CONST_VENETIAN_BLIND_MODE_US,
295 CONST_VENETIAN_BLIND_MODE_EU,
301 entry.id: entry.name_by_user
if entry.name_by_user
else entry.name
309 vol.Optional(CONF_REPLACE_DEVICE): vol.In(replace_devices),
314 step_id=
"set_device_options",
315 data_schema=vol.Schema(data_schema),
320 """Migrate properties of a device into another."""
324 old_entry = device_registry.async_get(old_device)
326 device_registry.async_update_device(
328 area_id=old_entry.area_id,
329 name_by_user=old_entry.name_by_user,
335 old_device_id =
"_".join(x
for x
in old_device_data[CONF_DEVICE_ID])
336 new_device_id =
"_".join(x
for x
in new_device_data[CONF_DEVICE_ID])
338 entity_registry = er.async_get(self.hass)
339 entity_entries = er.async_entries_for_device(
340 entity_registry, old_device, include_disabled_entities=
True
342 entity_migration_map = {}
343 for entry
in entity_entries:
344 unique_id = entry.unique_id
345 new_unique_id = unique_id.replace(old_device_id, new_device_id)
347 new_entity_id = entity_registry.async_get_entity_id(
348 entry.domain, entry.platform, new_unique_id
351 if new_entity_id
is not None:
352 entity_migration_map[new_entity_id] = entry
355 def _handle_state_removed(event: Event[EventStateChangedData]) ->
None:
357 new_state = event.data[
"new_state"]
358 entity_id = event.data[
"entity_id"]
359 if new_state
is None and entity_id
in entities_to_be_removed:
360 entities_to_be_removed.remove(entity_id)
361 if not entities_to_be_removed:
362 wait_for_entities.set()
366 entities_to_be_removed = {
368 for entry
in entity_migration_map.values()
369 if not self.hass.states.async_available(entry.entity_id)
371 wait_for_entities = asyncio.Event()
373 self.hass, entities_to_be_removed, _handle_state_removed
376 for entry
in entity_migration_map.values():
377 entity_registry.async_remove(entry.entity_id)
380 with suppress(TimeoutError):
381 async
with asyncio.timeout(10):
382 await wait_for_entities.wait()
383 remove_track_state_changes()
386 def _handle_state_added(event: Event[EventStateChangedData]) ->
None:
388 old_state = event.data[
"old_state"]
389 entity_id = event.data[
"entity_id"]
390 if old_state
is None and entity_id
in entities_to_be_added:
391 entities_to_be_added.remove(entity_id)
392 if not entities_to_be_added:
393 wait_for_entities.set()
396 entities_to_be_added = {
398 for entry
in entity_migration_map.values()
399 if self.hass.states.async_available(entry.entity_id)
401 wait_for_entities = asyncio.Event()
403 self.hass, entities_to_be_added, _handle_state_added
406 for entity_id, entry
in entity_migration_map.items():
407 entity_registry.async_update_entity(
409 new_entity_id=entry.entity_id,
415 with suppress(TimeoutError):
416 async
with asyncio.timeout(10):
417 await wait_for_entities.wait()
418 remove_track_state_changes()
420 device_registry.async_remove_device(old_device)
423 """Check if device does not already exist."""
429 device_id =
get_device_id(rfx_obj.device, entity_info.get(CONF_DATA_BITS))
430 if new_device_id == device_id:
436 """Check if device can be replaced with selected device."""
441 if (event_code := device_data[
"event_code"])
is not None:
446 rfx_obj.device.packettype
448 and rfx_obj.device.subtype
459 return data[
"event_code"]
462 """Get event code based on device identifier."""
463 event_code: str |
None =
None
469 if tuple(entity_info.get(CONF_DEVICE_ID)) == device_id:
470 event_code = cast(str, packet_id)
472 return DeviceData(event_code=event_code, device_id=device_id)
477 global_options: dict[str, Any] |
None =
None,
478 devices: dict[str, Any] |
None =
None,
480 """Update data in ConfigEntry."""
484 entry_data.update(global_options)
486 for event_code, options
in devices.items():
491 entry_data[CONF_DEVICES].pop(event_code,
None)
493 entry_data[CONF_DEVICES][event_code] = options
495 self.hass.async_create_task(
501 """Handle a config flow for RFXCOM RFXtrx."""
506 self, user_input: dict[str, Any] |
None =
None
507 ) -> ConfigFlowResult:
508 """Step when user initializes a integration."""
512 errors: dict[str, str] = {}
513 if user_input
is not None:
514 if user_input[CONF_TYPE] ==
"Serial":
519 list_of_types = [
"Serial",
"Network"]
521 schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)})
525 self, user_input: dict[str, Any] |
None =
None
526 ) -> ConfigFlowResult:
527 """Step when setting up network configuration."""
528 errors: dict[str, str] = {}
530 if user_input
is not None:
531 host = user_input[CONF_HOST]
532 port = user_input[CONF_PORT]
536 except CannotConnect:
537 errors[
"base"] =
"cannot_connect"
543 {vol.Required(CONF_HOST): str, vol.Required(CONF_PORT): int}
546 step_id=
"setup_network",
552 self, user_input: dict[str, Any] |
None =
None
553 ) -> ConfigFlowResult:
554 """Step when setting up serial configuration."""
555 errors: dict[str, str] = {}
557 if user_input
is not None:
558 user_selection = user_input[CONF_DEVICE]
559 if user_selection == CONF_MANUAL_PATH:
562 dev_path = await self.hass.async_add_executor_job(
563 get_serial_by_id, user_selection
568 except CannotConnect:
569 errors[
"base"] =
"cannot_connect"
574 ports = await self.hass.async_add_executor_job(serial.tools.list_ports.comports)
577 list_of_ports[port.device] = (
578 f
"{port}, s/n: {port.serial_number or 'n/a'}"
579 + (f
" - {port.manufacturer}" if port.manufacturer
else "")
581 list_of_ports[CONF_MANUAL_PATH] = CONF_MANUAL_PATH
583 schema = vol.Schema({vol.Required(CONF_DEVICE): vol.In(list_of_ports)})
585 step_id=
"setup_serial",
591 self, user_input: dict[str, Any] |
None =
None
592 ) -> ConfigFlowResult:
593 """Select path manually."""
594 errors: dict[str, str] = {}
596 if user_input
is not None:
597 device = user_input[CONF_DEVICE]
600 except CannotConnect:
601 errors[
"base"] =
"cannot_connect"
606 schema = vol.Schema({vol.Required(CONF_DEVICE): str})
608 step_id=
"setup_serial_manual_path",
615 host: str |
None =
None,
616 port: int |
None =
None,
617 device: str |
None =
None,
619 """Create data for rfxtrx entry."""
620 success = await self.hass.async_add_executor_job(
621 _test_transport, host, port, device
626 data: dict[str, Any] = {
630 CONF_AUTOMATIC_ADD:
False,
638 config_entry: ConfigEntry,
639 ) -> RfxtrxOptionsFlow:
640 """Get the options flow for this handler."""
645 """Construct a rfx object based on config."""
647 conn = rfxtrxmod.PyNetworkTransport((host, port))
649 conn = rfxtrxmod.PySerialTransport(device)
653 except (rfxtrxmod.RFXtrxTransportError, TimeoutError):
660 """Return a /dev/serial/by-id match for given device if available."""
661 by_id =
"/dev/serial/by-id"
662 if not os.path.isdir(by_id):
665 for path
in (entry.path
for entry
in os.scandir(by_id)
if entry.is_symlink()):
666 if os.path.realpath(path) == dev_path:
672 """Error to indicate we cannot connect."""
ConfigFlowResult async_step_setup_serial(self, dict[str, Any]|None user_input=None)
RfxtrxOptionsFlow async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_setup_network(self, dict[str, Any]|None user_input=None)
dict[str, Any] async_validate_rfx(self, str|None host=None, int|None port=None, str|None device=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_setup_serial_manual_path(self, dict[str, Any]|None user_input=None)
None update_config_data(self, dict[str, Any]|None global_options=None, dict[str, Any]|None devices=None)
_selected_device_entry_id
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
bool _can_add_device(self, rfxtrxmod.RFXtrxEvent new_rfx_obj)
DeviceData _get_device_data(self, str entry_id)
None _async_replace_device(self, str replace_device)
bool _can_replace_device(self, str entry_id)
str|None _get_device_event_code(self, str entry_id)
ConfigFlowResult async_step_set_device_options(self, dict[str, Any]|None user_input=None)
_selected_device_event_code
ConfigFlowResult async_step_prompt_options(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
bool _test_transport(str|None host, int|None port, str|None device)
str get_serial_by_id(str dev_path)
int|None none_or_int(str|None value, int base)
DeviceTuple|None get_device_tuple_from_identifiers(set[tuple[str, str]] identifiers)
DeviceTuple get_device_id(rfxtrxmod.RFXtrxDevice device, int|None data_bits=None)
rfxtrxmod.RFXtrxEvent|None get_rfx_object(str packetid)
AreaRegistry async_get(HomeAssistant hass)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)