1 """Receive signals from a keyboard and use it as a remote control."""
3 from __future__
import annotations
6 from contextlib
import suppress
11 from asyncinotify
import Inotify, Mask
12 from evdev
import InputDevice, categorize, ecodes, list_devices
13 import voluptuous
as vol
20 _LOGGER = logging.getLogger(__name__)
22 DEVICE_DESCRIPTOR =
"device_descriptor"
23 DEVICE_ID_GROUP =
"Device description"
24 DEVICE_NAME =
"device_name"
25 DOMAIN =
"keyboard_remote"
30 KEY_VALUE = {
"key_up": 0,
"key_down": 1,
"key_hold": 2}
31 KEY_VALUE_NAME = {value: key
for key, value
in KEY_VALUE.items()}
32 KEYBOARD_REMOTE_COMMAND_RECEIVED =
"keyboard_remote_command_received"
33 KEYBOARD_REMOTE_CONNECTED =
"keyboard_remote_connected"
34 KEYBOARD_REMOTE_DISCONNECTED =
"keyboard_remote_disconnected"
37 EMULATE_KEY_HOLD =
"emulate_key_hold"
38 EMULATE_KEY_HOLD_DELAY =
"emulate_key_hold_delay"
39 EMULATE_KEY_HOLD_REPEAT =
"emulate_key_hold_repeat"
41 DEVINPUT =
"/dev/input"
43 CONFIG_SCHEMA = vol.Schema(
50 vol.Exclusive(DEVICE_DESCRIPTOR, DEVICE_ID_GROUP): cv.string,
51 vol.Exclusive(DEVICE_NAME, DEVICE_ID_GROUP): cv.string,
52 vol.Optional(TYPE, default=[
"key_up"]): vol.All(
53 cv.ensure_list, [vol.In(KEY_VALUE)]
55 vol.Optional(EMULATE_KEY_HOLD, default=
False): cv.boolean,
56 vol.Optional(EMULATE_KEY_HOLD_DELAY, default=0.250): float,
57 vol.Optional(EMULATE_KEY_HOLD_REPEAT, default=0.033): float,
60 cv.has_at_least_one_key(DEVICE_DESCRIPTOR, DEVICE_ID_GROUP),
64 extra=vol.ALLOW_EXTRA,
68 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
69 """Set up the keyboard_remote."""
70 domain_config: list[dict[str, Any]] = config[DOMAIN]
79 """Manage device connection/disconnection using inotify to asynchronously monitor."""
81 def __init__(self, hass: HomeAssistant, config: list[dict[str, Any]]) ->
None:
82 """Create handlers and setup dictionaries to keep track of them."""
86 self.active_handlers_by_descriptor: dict[str, asyncio.Future] = {}
91 for dev_block
in config:
93 descriptor = dev_block.get(DEVICE_DESCRIPTOR)
94 if descriptor
is not None:
97 name = dev_block.get(DEVICE_NAME)
101 """Listen for Home Assistant start and stop events."""
103 self.
hasshass.bus.async_listen_once(
106 self.
hasshass.bus.async_listen_once(
111 """Start monitoring of events and devices.
113 Start inotify watching for events, start event monitoring for those already
114 connected, and start monitoring for device connection/disconnection.
117 _LOGGER.debug(
"Start monitoring")
119 self.
inotifyinotify = Inotify()
121 DEVINPUT, Mask.CREATE | Mask.ATTRIB | Mask.DELETE
126 initial_start_monitoring = set()
127 descriptors = await self.
hasshass.async_add_executor_job(list_devices, DEVINPUT)
128 for descriptor
in descriptors:
129 dev, handler = await self.
hasshass.async_add_executor_job(
136 self.active_handlers_by_descriptor[descriptor] = handler
137 initial_start_monitoring.add(
138 asyncio.create_task(handler.async_device_start_monitoring(dev))
141 if initial_start_monitoring:
142 await asyncio.wait(initial_start_monitoring)
147 """Stop and cleanup running monitoring tasks."""
149 _LOGGER.debug(
"Cleanup on shutdown")
160 handler_stop_monitoring = set()
161 for handler
in self.active_handlers_by_descriptor.values():
162 handler_stop_monitoring.add(
163 asyncio.create_task(handler.async_device_stop_monitoring())
165 if handler_stop_monitoring:
166 await asyncio.wait(handler_stop_monitoring)
173 """Find the correct device handler given a descriptor (path)."""
177 dev = InputDevice(descriptor)
189 if test_handler.dev
is not None:
190 fullpath = test_handler.dev.path
192 fullpath = os.path.realpath(test_descriptor)
193 if fullpath == descriptor:
194 handler = test_handler
196 return (dev, handler)
199 """Monitor asynchronously for device connection/disconnection or permissions changes."""
201 _LOGGER.debug(
"Start monitoring loop")
204 async
for event
in self.
inotifyinotify:
205 descriptor = f
"{DEVINPUT}/{event.name}"
207 "got event for %s: %s",
212 descriptor_active = descriptor
in self.active_handlers_by_descriptor
214 if (event.mask & Mask.DELETE)
and descriptor_active:
215 _LOGGER.debug(
"removing: %s", descriptor)
216 handler = self.active_handlers_by_descriptor[descriptor]
217 del self.active_handlers_by_descriptor[descriptor]
218 await handler.async_device_stop_monitoring()
220 (event.mask & Mask.CREATE)
or (event.mask & Mask.ATTRIB)
221 )
and not descriptor_active:
222 _LOGGER.debug(
"checking new: %s", descriptor)
223 dev, handler = await self.
hasshass.async_add_executor_job(
228 _LOGGER.debug(
"adding: %s", descriptor)
229 self.active_handlers_by_descriptor[descriptor] = handler
230 await handler.async_device_start_monitoring(dev)
231 except asyncio.CancelledError:
232 _LOGGER.debug(
"Monitoring canceled")
236 """Manage input events using evdev with asyncio."""
238 def __init__(self, hass: HomeAssistant, dev_block: dict[str, Any]) ->
None:
239 """Fill configuration data."""
243 key_types = dev_block[TYPE]
246 for key_type
in key_types:
258 """Emulate keyboard delay/repeat behaviour by sending key events on a timer."""
260 await asyncio.sleep(delay)
262 self.
hasshass.bus.async_fire(
263 KEYBOARD_REMOTE_COMMAND_RECEIVED,
268 DEVICE_NAME: self.
devdev.name,
271 await asyncio.sleep(repeat)
274 """Start event monitoring task and issue event."""
275 _LOGGER.debug(
"Keyboard async_device_start_monitoring, %s", dev.name)
287 self.
hasshass.bus.async_fire(
288 KEYBOARD_REMOTE_CONNECTED,
291 DEVICE_NAME: dev.name,
294 _LOGGER.debug(
"Keyboard (re-)connected, %s", dev.name)
297 """Stop event monitoring task and issue event."""
299 with suppress(OSError):
300 await self.
hasshass.async_add_executor_job(self.
devdev.ungrab)
304 asyncio.get_event_loop().remove_reader(self.
devdev.fileno())
310 self.
hasshass.bus.async_fire(
311 KEYBOARD_REMOTE_DISCONNECTED,
314 DEVICE_NAME: self.
devdev.name,
317 _LOGGER.debug(
"Keyboard disconnected, %s", self.
devdev.name)
322 """Event monitoring loop.
324 Monitor one device for new events using evdev with asyncio,
325 start and stop key hold emulation tasks as needed.
331 _LOGGER.debug(
"Start device monitoring")
332 await self.
hasshass.async_add_executor_job(self.
devdev.grab)
333 async
for event
in self.
devdev.async_read_loop():
334 if event.type
is ecodes.EV_KEY:
337 "device: %s: %s", self.
devdev.name, categorize(event)
340 self.
hasshass.bus.async_fire(
341 KEYBOARD_REMOTE_COMMAND_RECEIVED,
343 KEY_CODE: event.code,
344 TYPE: KEY_VALUE_NAME[event.value],
346 DEVICE_NAME: self.
devdev.name,
351 event.value == KEY_VALUE[
"key_down"]
354 repeat_tasks[event.code] = self.
hasshass.async_create_task(
362 event.value == KEY_VALUE[
"key_up"]
363 and event.code
in repeat_tasks
365 repeat_tasks[event.code].cancel()
366 del repeat_tasks[event.code]
367 except (OSError, asyncio.CancelledError):
369 for task
in repeat_tasks.values():
373 await asyncio.wait(repeat_tasks.values())
def async_device_keyrepeat(self, code, delay, repeat)
def async_device_monitor_input(self)
def async_device_start_monitoring(self, dev)
None __init__(self, HomeAssistant hass, dict[str, Any] dev_block)
def async_device_stop_monitoring(self)
def get_device_handler(self, descriptor)
def async_start_monitoring(self, event)
def async_monitor_devices(self)
def async_stop_monitoring(self, event)
None __init__(self, HomeAssistant hass, list[dict[str, Any]] config)
bool add(self, _T matcher)
bool async_setup(HomeAssistant hass, ConfigType config)