Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Receive signals from a keyboard and use it as a remote control."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from contextlib import suppress
7 import logging
8 import os
9 from typing import Any
10 
11 from asyncinotify import Inotify, Mask
12 from evdev import InputDevice, categorize, ecodes, list_devices
13 import voluptuous as vol
14 
15 from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
16 from homeassistant.core import HomeAssistant
18 from homeassistant.helpers.typing import ConfigType
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 DEVICE_DESCRIPTOR = "device_descriptor"
23 DEVICE_ID_GROUP = "Device description"
24 DEVICE_NAME = "device_name"
25 DOMAIN = "keyboard_remote"
26 
27 ICON = "mdi:remote"
28 
29 KEY_CODE = "key_code"
30 KEY_VALUE = {"key_up": 0, "key_down": 1, "key_hold": 2}
31 KEY_VALUE_NAME = {value: key for key, value in KEY_VALUE.items()}
32 KEYBOARD_REMOTE_COMMAND_RECEIVED = "keyboard_remote_command_received"
33 KEYBOARD_REMOTE_CONNECTED = "keyboard_remote_connected"
34 KEYBOARD_REMOTE_DISCONNECTED = "keyboard_remote_disconnected"
35 
36 TYPE = "type"
37 EMULATE_KEY_HOLD = "emulate_key_hold"
38 EMULATE_KEY_HOLD_DELAY = "emulate_key_hold_delay"
39 EMULATE_KEY_HOLD_REPEAT = "emulate_key_hold_repeat"
40 
41 DEVINPUT = "/dev/input"
42 
43 CONFIG_SCHEMA = vol.Schema(
44  {
45  DOMAIN: vol.All(
46  cv.ensure_list,
47  [
48  vol.Schema(
49  {
50  vol.Exclusive(DEVICE_DESCRIPTOR, DEVICE_ID_GROUP): cv.string,
51  vol.Exclusive(DEVICE_NAME, DEVICE_ID_GROUP): cv.string,
52  vol.Optional(TYPE, default=["key_up"]): vol.All(
53  cv.ensure_list, [vol.In(KEY_VALUE)]
54  ),
55  vol.Optional(EMULATE_KEY_HOLD, default=False): cv.boolean,
56  vol.Optional(EMULATE_KEY_HOLD_DELAY, default=0.250): float,
57  vol.Optional(EMULATE_KEY_HOLD_REPEAT, default=0.033): float,
58  }
59  ),
60  cv.has_at_least_one_key(DEVICE_DESCRIPTOR, DEVICE_ID_GROUP),
61  ],
62  )
63  },
64  extra=vol.ALLOW_EXTRA,
65 )
66 
67 
68 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
69  """Set up the keyboard_remote."""
70  domain_config: list[dict[str, Any]] = config[DOMAIN]
71 
72  remote = KeyboardRemote(hass, domain_config)
73  remote.setup()
74 
75  return True
76 
77 
79  """Manage device connection/disconnection using inotify to asynchronously monitor."""
80 
81  def __init__(self, hass: HomeAssistant, config: list[dict[str, Any]]) -> None:
82  """Create handlers and setup dictionaries to keep track of them."""
83  self.hasshass = hass
84  self.handlers_by_namehandlers_by_name = {}
85  self.handlers_by_descriptorhandlers_by_descriptor = {}
86  self.active_handlers_by_descriptor: dict[str, asyncio.Future] = {}
87  self.inotifyinotify = None
88  self.watcherwatcher = None
89  self.monitor_taskmonitor_task = None
90 
91  for dev_block in config:
92  handler = self.DeviceHandlerDeviceHandler(hass, dev_block)
93  descriptor = dev_block.get(DEVICE_DESCRIPTOR)
94  if descriptor is not None:
95  self.handlers_by_descriptorhandlers_by_descriptor[descriptor] = handler
96  else:
97  name = dev_block.get(DEVICE_NAME)
98  self.handlers_by_namehandlers_by_name[name] = handler
99 
100  def setup(self):
101  """Listen for Home Assistant start and stop events."""
102 
103  self.hasshass.bus.async_listen_once(
104  EVENT_HOMEASSISTANT_START, self.async_start_monitoringasync_start_monitoring
105  )
106  self.hasshass.bus.async_listen_once(
107  EVENT_HOMEASSISTANT_STOP, self.async_stop_monitoringasync_stop_monitoring
108  )
109 
110  async def async_start_monitoring(self, event):
111  """Start monitoring of events and devices.
112 
113  Start inotify watching for events, start event monitoring for those already
114  connected, and start monitoring for device connection/disconnection.
115  """
116 
117  _LOGGER.debug("Start monitoring")
118 
119  self.inotifyinotify = Inotify()
120  self.watcherwatcher = self.inotifyinotify.add_watch(
121  DEVINPUT, Mask.CREATE | Mask.ATTRIB | Mask.DELETE
122  )
123 
124  # add initial devices (do this AFTER starting watcher in order to
125  # avoid race conditions leading to missing device connections)
126  initial_start_monitoring = set()
127  descriptors = await self.hasshass.async_add_executor_job(list_devices, DEVINPUT)
128  for descriptor in descriptors:
129  dev, handler = await self.hasshass.async_add_executor_job(
130  self.get_device_handlerget_device_handler, descriptor
131  )
132 
133  if handler is None:
134  continue
135 
136  self.active_handlers_by_descriptor[descriptor] = handler
137  initial_start_monitoring.add(
138  asyncio.create_task(handler.async_device_start_monitoring(dev))
139  )
140 
141  if initial_start_monitoring:
142  await asyncio.wait(initial_start_monitoring)
143 
144  self.monitor_taskmonitor_task = self.hasshass.async_create_task(self.async_monitor_devicesasync_monitor_devices())
145 
146  async def async_stop_monitoring(self, event):
147  """Stop and cleanup running monitoring tasks."""
148 
149  _LOGGER.debug("Cleanup on shutdown")
150 
151  if self.inotifyinotify and self.watcherwatcher:
152  self.inotifyinotify.rm_watch(self.watcherwatcher)
153  self.watcherwatcher = None
154 
155  if self.monitor_taskmonitor_task is not None:
156  if not self.monitor_taskmonitor_task.done():
157  self.monitor_taskmonitor_task.cancel()
158  await self.monitor_taskmonitor_task
159 
160  handler_stop_monitoring = set()
161  for handler in self.active_handlers_by_descriptor.values():
162  handler_stop_monitoring.add(
163  asyncio.create_task(handler.async_device_stop_monitoring())
164  )
165  if handler_stop_monitoring:
166  await asyncio.wait(handler_stop_monitoring)
167 
168  if self.inotifyinotify:
169  self.inotifyinotify.close()
170  self.inotifyinotify = None
171 
172  def get_device_handler(self, descriptor):
173  """Find the correct device handler given a descriptor (path)."""
174 
175  # devices are often added and then correct permissions set after
176  try:
177  dev = InputDevice(descriptor)
178  except OSError:
179  return (None, None)
180 
181  handler = None
182  if descriptor in self.handlers_by_descriptorhandlers_by_descriptor:
183  handler = self.handlers_by_descriptorhandlers_by_descriptor[descriptor]
184  elif dev.name in self.handlers_by_namehandlers_by_name:
185  handler = self.handlers_by_namehandlers_by_name[dev.name]
186  else:
187  # check for symlinked paths matching descriptor
188  for test_descriptor, test_handler in self.handlers_by_descriptorhandlers_by_descriptor.items():
189  if test_handler.dev is not None:
190  fullpath = test_handler.dev.path
191  else:
192  fullpath = os.path.realpath(test_descriptor)
193  if fullpath == descriptor:
194  handler = test_handler
195 
196  return (dev, handler)
197 
198  async def async_monitor_devices(self):
199  """Monitor asynchronously for device connection/disconnection or permissions changes."""
200 
201  _LOGGER.debug("Start monitoring loop")
202 
203  try:
204  async for event in self.inotifyinotify:
205  descriptor = f"{DEVINPUT}/{event.name}"
206  _LOGGER.debug(
207  "got event for %s: %s",
208  descriptor,
209  event.mask,
210  )
211 
212  descriptor_active = descriptor in self.active_handlers_by_descriptor
213 
214  if (event.mask & Mask.DELETE) and descriptor_active:
215  _LOGGER.debug("removing: %s", descriptor)
216  handler = self.active_handlers_by_descriptor[descriptor]
217  del self.active_handlers_by_descriptor[descriptor]
218  await handler.async_device_stop_monitoring()
219  elif (
220  (event.mask & Mask.CREATE) or (event.mask & Mask.ATTRIB)
221  ) and not descriptor_active:
222  _LOGGER.debug("checking new: %s", descriptor)
223  dev, handler = await self.hasshass.async_add_executor_job(
224  self.get_device_handlerget_device_handler, descriptor
225  )
226  if handler is None:
227  continue
228  _LOGGER.debug("adding: %s", descriptor)
229  self.active_handlers_by_descriptor[descriptor] = handler
230  await handler.async_device_start_monitoring(dev)
231  except asyncio.CancelledError:
232  _LOGGER.debug("Monitoring canceled")
233  return
234 
236  """Manage input events using evdev with asyncio."""
237 
238  def __init__(self, hass: HomeAssistant, dev_block: dict[str, Any]) -> None:
239  """Fill configuration data."""
240 
241  self.hasshass = hass
242 
243  key_types = dev_block[TYPE]
244 
245  self.key_valueskey_values = set()
246  for key_type in key_types:
247  self.key_valueskey_values.add(KEY_VALUE[key_type])
248 
249  self.emulate_key_holdemulate_key_hold = dev_block[EMULATE_KEY_HOLD]
250  self.emulate_key_hold_delayemulate_key_hold_delay = dev_block[EMULATE_KEY_HOLD_DELAY]
251  self.emulate_key_hold_repeatemulate_key_hold_repeat = dev_block[EMULATE_KEY_HOLD_REPEAT]
252  self.monitor_taskmonitor_task = None
253  self.devdev = None
254  self.config_descriptorconfig_descriptor = dev_block.get(DEVICE_DESCRIPTOR)
255  self.descriptordescriptor = None
256 
257  async def async_device_keyrepeat(self, code, delay, repeat):
258  """Emulate keyboard delay/repeat behaviour by sending key events on a timer."""
259 
260  await asyncio.sleep(delay)
261  while True:
262  self.hasshass.bus.async_fire(
263  KEYBOARD_REMOTE_COMMAND_RECEIVED,
264  {
265  KEY_CODE: code,
266  TYPE: "key_hold",
267  DEVICE_DESCRIPTOR: self.descriptordescriptor,
268  DEVICE_NAME: self.devdev.name,
269  },
270  )
271  await asyncio.sleep(repeat)
272 
273  async def async_device_start_monitoring(self, dev):
274  """Start event monitoring task and issue event."""
275  _LOGGER.debug("Keyboard async_device_start_monitoring, %s", dev.name)
276  if self.monitor_taskmonitor_task is None:
277  self.devdev = dev
278  # set the descriptor to the one provided to the config if any, falling back to the device path if not set
279  if self.config_descriptorconfig_descriptor:
280  self.descriptordescriptor = self.config_descriptorconfig_descriptor
281  else:
282  self.descriptordescriptor = self.devdev.path
283 
284  self.monitor_taskmonitor_task = self.hasshass.async_create_task(
285  self.async_device_monitor_inputasync_device_monitor_input()
286  )
287  self.hasshass.bus.async_fire(
288  KEYBOARD_REMOTE_CONNECTED,
289  {
290  DEVICE_DESCRIPTOR: self.descriptordescriptor,
291  DEVICE_NAME: dev.name,
292  },
293  )
294  _LOGGER.debug("Keyboard (re-)connected, %s", dev.name)
295 
297  """Stop event monitoring task and issue event."""
298  if self.monitor_taskmonitor_task is not None:
299  with suppress(OSError):
300  await self.hasshass.async_add_executor_job(self.devdev.ungrab)
301  # monitoring of the device form the event loop and closing of the
302  # device has to occur before cancelling the task to avoid
303  # triggering unhandled exceptions inside evdev coroutines
304  asyncio.get_event_loop().remove_reader(self.devdev.fileno())
305  self.devdev.close()
306  if not self.monitor_taskmonitor_task.done():
307  self.monitor_taskmonitor_task.cancel()
308  await self.monitor_taskmonitor_task
309  self.monitor_taskmonitor_task = None
310  self.hasshass.bus.async_fire(
311  KEYBOARD_REMOTE_DISCONNECTED,
312  {
313  DEVICE_DESCRIPTOR: self.descriptordescriptor,
314  DEVICE_NAME: self.devdev.name,
315  },
316  )
317  _LOGGER.debug("Keyboard disconnected, %s", self.devdev.name)
318  self.devdev = None
319  self.descriptordescriptor = self.config_descriptorconfig_descriptor
320 
321  async def async_device_monitor_input(self):
322  """Event monitoring loop.
323 
324  Monitor one device for new events using evdev with asyncio,
325  start and stop key hold emulation tasks as needed.
326  """
327 
328  repeat_tasks = {}
329 
330  try:
331  _LOGGER.debug("Start device monitoring")
332  await self.hasshass.async_add_executor_job(self.devdev.grab)
333  async for event in self.devdev.async_read_loop():
334  if event.type is ecodes.EV_KEY:
335  if event.value in self.key_valueskey_values:
336  _LOGGER.debug(
337  "device: %s: %s", self.devdev.name, categorize(event)
338  )
339 
340  self.hasshass.bus.async_fire(
341  KEYBOARD_REMOTE_COMMAND_RECEIVED,
342  {
343  KEY_CODE: event.code,
344  TYPE: KEY_VALUE_NAME[event.value],
345  DEVICE_DESCRIPTOR: self.descriptordescriptor,
346  DEVICE_NAME: self.devdev.name,
347  },
348  )
349 
350  if (
351  event.value == KEY_VALUE["key_down"]
352  and self.emulate_key_holdemulate_key_hold
353  ):
354  repeat_tasks[event.code] = self.hasshass.async_create_task(
355  self.async_device_keyrepeatasync_device_keyrepeat(
356  event.code,
357  self.emulate_key_hold_delayemulate_key_hold_delay,
358  self.emulate_key_hold_repeatemulate_key_hold_repeat,
359  )
360  )
361  elif (
362  event.value == KEY_VALUE["key_up"]
363  and event.code in repeat_tasks
364  ):
365  repeat_tasks[event.code].cancel()
366  del repeat_tasks[event.code]
367  except (OSError, asyncio.CancelledError):
368  # cancel key repeat tasks
369  for task in repeat_tasks.values():
370  task.cancel()
371 
372  if repeat_tasks:
373  await asyncio.wait(repeat_tasks.values())
None __init__(self, HomeAssistant hass, dict[str, Any] dev_block)
Definition: __init__.py:238
None __init__(self, HomeAssistant hass, list[dict[str, Any]] config)
Definition: __init__.py:81
bool add(self, _T matcher)
Definition: match.py:185
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:68