Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The USB Discovery integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Coroutine
6 import dataclasses
7 import fnmatch
8 import logging
9 import os
10 import sys
11 from typing import TYPE_CHECKING, Any
12 
13 from serial.tools.list_ports import comports
14 from serial.tools.list_ports_common import ListPortInfo
15 import voluptuous as vol
16 
17 from homeassistant import config_entries
18 from homeassistant.components import websocket_api
19 from homeassistant.components.websocket_api import ActiveConnection
20 from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
21 from homeassistant.core import (
22  CALLBACK_TYPE,
23  Event,
24  HomeAssistant,
25  callback as hass_callback,
26 )
27 from homeassistant.data_entry_flow import BaseServiceInfo
28 from homeassistant.helpers import config_validation as cv, discovery_flow, system_info
29 from homeassistant.helpers.debounce import Debouncer
30 from homeassistant.helpers.typing import ConfigType
31 from homeassistant.loader import USBMatcher, async_get_usb
32 
33 from .const import DOMAIN
34 from .models import USBDevice
35 from .utils import usb_device_from_port
36 
37 if TYPE_CHECKING:
38  from pyudev import Device, MonitorObserver
39 
40 _LOGGER = logging.getLogger(__name__)
41 
42 REQUEST_SCAN_COOLDOWN = 60 # 1 minute cooldown
43 
44 __all__ = [
45  "async_is_plugged_in",
46  "async_register_scan_request_callback",
47  "USBCallbackMatcher",
48  "UsbServiceInfo",
49 ]
50 
51 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
52 
53 
55  """Callback matcher for the USB integration."""
56 
57 
58 @hass_callback
60  hass: HomeAssistant, callback: CALLBACK_TYPE
61 ) -> CALLBACK_TYPE:
62  """Register to receive a callback when a scan should be initiated."""
63  discovery: USBDiscovery = hass.data[DOMAIN]
64  return discovery.async_register_scan_request_callback(callback)
65 
66 
67 @hass_callback
69  hass: HomeAssistant, callback: CALLBACK_TYPE
70 ) -> CALLBACK_TYPE:
71  """Register to receive a callback when the initial USB scan is done.
72 
73  If the initial scan is already done, the callback is called immediately.
74  """
75  discovery: USBDiscovery = hass.data[DOMAIN]
76  return discovery.async_register_initial_scan_callback(callback)
77 
78 
79 @hass_callback
80 def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> bool:
81  """Return True is a USB device is present."""
82 
83  vid = matcher.get("vid", "")
84  pid = matcher.get("pid", "")
85  serial_number = matcher.get("serial_number", "")
86  manufacturer = matcher.get("manufacturer", "")
87  description = matcher.get("description", "")
88 
89  if (
90  vid != vid.upper()
91  or pid != pid.upper()
92  or serial_number != serial_number.lower()
93  or manufacturer != manufacturer.lower()
94  or description != description.lower()
95  ):
96  raise ValueError(
97  f"vid and pid must be uppercase, the rest lowercase in matcher {matcher!r}"
98  )
99 
100  usb_discovery: USBDiscovery = hass.data[DOMAIN]
101  return any(
102  _is_matching(USBDevice(*device_tuple), matcher)
103  for device_tuple in usb_discovery.seen
104  )
105 
106 
107 @dataclasses.dataclass(slots=True)
109  """Prepared info from usb entries."""
110 
111  device: str
112  vid: str
113  pid: str
114  serial_number: str | None
115  manufacturer: str | None
116  description: str | None
117 
118 
120  device: str,
121  serial_number: str | None,
122  manufacturer: str | None,
123  description: str | None,
124  vid: str | None,
125  pid: str | None,
126 ) -> str:
127  """Return a human readable name from USBDevice attributes."""
128  device_details = f"{device}, s/n: {serial_number or 'n/a'}"
129  manufacturer_details = f" - {manufacturer}" if manufacturer else ""
130  vendor_details = f" - {vid}:{pid}" if vid else ""
131  full_details = f"{device_details}{manufacturer_details}{vendor_details}"
132 
133  if not description:
134  return full_details
135  return f"{description[:26]} - {full_details}"
136 
137 
138 def get_serial_by_id(dev_path: str) -> str:
139  """Return a /dev/serial/by-id match for given device if available."""
140  by_id = "/dev/serial/by-id"
141  if not os.path.isdir(by_id):
142  return dev_path
143 
144  for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
145  if os.path.realpath(path) == dev_path:
146  return path
147  return dev_path
148 
149 
150 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
151  """Set up the USB Discovery integration."""
152  usb = await async_get_usb(hass)
153  usb_discovery = USBDiscovery(hass, usb)
154  await usb_discovery.async_setup()
155  hass.data[DOMAIN] = usb_discovery
156  websocket_api.async_register_command(hass, websocket_usb_scan)
157 
158  return True
159 
160 
161 def _fnmatch_lower(name: str | None, pattern: str) -> bool:
162  """Match a lowercase version of the name."""
163  if name is None:
164  return False
165  return fnmatch.fnmatch(name.lower(), pattern)
166 
167 
168 def _is_matching(device: USBDevice, matcher: USBMatcher | USBCallbackMatcher) -> bool:
169  """Return True if a device matches."""
170  if "vid" in matcher and device.vid != matcher["vid"]:
171  return False
172  if "pid" in matcher and device.pid != matcher["pid"]:
173  return False
174  if "serial_number" in matcher and not _fnmatch_lower(
175  device.serial_number, matcher["serial_number"]
176  ):
177  return False
178  if "manufacturer" in matcher and not _fnmatch_lower(
179  device.manufacturer, matcher["manufacturer"]
180  ):
181  return False
182  if "description" in matcher and not _fnmatch_lower(
183  device.description, matcher["description"]
184  ):
185  return False
186  return True
187 
188 
190  """Manage USB Discovery."""
191 
192  def __init__(
193  self,
194  hass: HomeAssistant,
195  usb: list[USBMatcher],
196  ) -> None:
197  """Init USB Discovery."""
198  self.hasshass = hass
199  self.usbusb = usb
200  self.seen: set[tuple[str, ...]] = set()
201  self.observer_activeobserver_active = False
202  self._request_debouncer_request_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None
203  self._request_callbacks: list[CALLBACK_TYPE] = []
204  self.initial_scan_doneinitial_scan_done = False
205  self._initial_scan_callbacks: list[CALLBACK_TYPE] = []
206 
207  async def async_setup(self) -> None:
208  """Set up USB Discovery."""
209  await self._async_start_monitor_async_start_monitor()
210  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self.async_startasync_start)
211  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stopasync_stop)
212 
213  async def async_start(self, event: Event) -> None:
214  """Start USB Discovery and run a manual scan."""
215  await self._async_scan_serial_async_scan_serial()
216 
217  @hass_callback
218  def async_stop(self, event: Event) -> None:
219  """Stop USB Discovery."""
220  if self._request_debouncer_request_debouncer:
221  self._request_debouncer_request_debouncer.async_shutdown()
222 
223  async def _async_start_monitor(self) -> None:
224  """Start monitoring hardware with pyudev."""
225  if not sys.platform.startswith("linux"):
226  return
227  info = await system_info.async_get_system_info(self.hasshass)
228  if info.get("docker"):
229  return
230 
231  if not (
232  observer := await self.hasshass.async_add_executor_job(
233  self._get_monitor_observer_get_monitor_observer
234  )
235  ):
236  return
237 
238  def _stop_observer(event: Event) -> None:
239  observer.stop()
240 
241  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_observer)
242  self.observer_activeobserver_active = True
243 
244  def _get_monitor_observer(self) -> MonitorObserver | None:
245  """Get the monitor observer.
246 
247  This runs in the executor because the import
248  does blocking I/O.
249  """
250  from pyudev import ( # pylint: disable=import-outside-toplevel
251  Context,
252  Monitor,
253  MonitorObserver,
254  )
255 
256  try:
257  context = Context()
258  except (ImportError, OSError):
259  return None
260 
261  monitor = Monitor.from_netlink(context)
262  try:
263  monitor.filter_by(subsystem="tty")
264  except ValueError as ex: # this fails on WSL
265  _LOGGER.debug(
266  "Unable to setup pyudev filtering; This is expected on WSL: %s", ex
267  )
268  return None
269 
270  observer = MonitorObserver(
271  monitor, callback=self._device_discovered_device_discovered, name="usb-observer"
272  )
273 
274  observer.start()
275  return observer
276 
277  def _device_discovered(self, device: Device) -> None:
278  """Call when the observer discovers a new usb tty device."""
279  if device.action != "add":
280  return
281  _LOGGER.debug(
282  "Discovered Device at path: %s, triggering scan serial",
283  device.device_path,
284  )
285  self.hasshass.create_task(self._async_scan_async_scan())
286 
287  @hass_callback
289  self,
290  _callback: CALLBACK_TYPE,
291  ) -> CALLBACK_TYPE:
292  """Register a scan request callback."""
293  self._request_callbacks.append(_callback)
294 
295  @hass_callback
296  def _async_remove_callback() -> None:
297  self._request_callbacks.remove(_callback)
298 
299  return _async_remove_callback
300 
301  @hass_callback
303  self,
304  callback: CALLBACK_TYPE,
305  ) -> CALLBACK_TYPE:
306  """Register an initial scan callback."""
307  if self.initial_scan_doneinitial_scan_done:
308  callback()
309  return lambda: None
310 
311  self._initial_scan_callbacks.append(callback)
312 
313  @hass_callback
314  def _async_remove_callback() -> None:
315  if callback not in self._initial_scan_callbacks:
316  return
317  self._initial_scan_callbacks.remove(callback)
318 
319  return _async_remove_callback
320 
321  async def _async_process_discovered_usb_device(self, device: USBDevice) -> None:
322  """Process a USB discovery."""
323  _LOGGER.debug("Discovered USB Device: %s", device)
324  device_tuple = dataclasses.astuple(device)
325  if device_tuple in self.seen:
326  return
327  self.seen.add(device_tuple)
328 
329  matched = [matcher for matcher in self.usbusb if _is_matching(device, matcher)]
330  if not matched:
331  return
332 
333  service_info: UsbServiceInfo | None = None
334 
335  sorted_by_most_targeted = sorted(matched, key=lambda item: -len(item))
336  most_matched_fields = len(sorted_by_most_targeted[0])
337 
338  for matcher in sorted_by_most_targeted:
339  # If there is a less targeted match, we only
340  # want the most targeted match
341  if len(matcher) < most_matched_fields:
342  break
343 
344  if service_info is None:
345  service_info = UsbServiceInfo(
346  device=await self.hasshass.async_add_executor_job(
347  get_serial_by_id, device.device
348  ),
349  vid=device.vid,
350  pid=device.pid,
351  serial_number=device.serial_number,
352  manufacturer=device.manufacturer,
353  description=device.description,
354  )
355 
356  discovery_flow.async_create_flow(
357  self.hasshass,
358  matcher["domain"],
359  {"source": config_entries.SOURCE_USB},
360  service_info,
361  )
362 
363  async def _async_process_ports(self, ports: list[ListPortInfo]) -> None:
364  """Process each discovered port."""
365  usb_devices = [
367  for port in ports
368  if port.vid is not None or port.pid is not None
369  ]
370 
371  # CP2102N chips create *two* serial ports on macOS: `/dev/cu.usbserial-` and
372  # `/dev/cu.SLAB_USBtoUART*`. The former does not work and we should ignore them.
373  if sys.platform == "darwin":
374  silabs_serials = {
375  dev.serial_number
376  for dev in usb_devices
377  if dev.device.startswith("/dev/cu.SLAB_USBtoUART")
378  }
379 
380  usb_devices = [
381  dev
382  for dev in usb_devices
383  if dev.serial_number not in silabs_serials
384  or (
385  dev.serial_number in silabs_serials
386  and dev.device.startswith("/dev/cu.SLAB_USBtoUART")
387  )
388  ]
389 
390  for usb_device in usb_devices:
391  await self._async_process_discovered_usb_device_async_process_discovered_usb_device(usb_device)
392 
393  async def _async_scan_serial(self) -> None:
394  """Scan serial ports."""
395  await self._async_process_ports_async_process_ports(
396  await self.hasshass.async_add_executor_job(comports)
397  )
398  if self.initial_scan_doneinitial_scan_done:
399  return
400 
401  self.initial_scan_doneinitial_scan_done = True
402  while self._initial_scan_callbacks:
403  self._initial_scan_callbacks.pop()()
404 
405  async def _async_scan(self) -> None:
406  """Scan for USB devices and notify callbacks to scan as well."""
407  for callback in self._request_callbacks:
408  callback()
409  await self._async_scan_serial_async_scan_serial()
410 
411  async def async_request_scan(self) -> None:
412  """Request a serial scan."""
413  if not self._request_debouncer_request_debouncer:
414  self._request_debouncer_request_debouncer = Debouncer(
415  self.hasshass,
416  _LOGGER,
417  cooldown=REQUEST_SCAN_COOLDOWN,
418  immediate=True,
419  function=self._async_scan_async_scan,
420  background=True,
421  )
422  await self._request_debouncer_request_debouncer.async_call()
423 
424 
425 @websocket_api.require_admin
426 @websocket_api.websocket_command({vol.Required("type"): "usb/scan"})
427 @websocket_api.async_response
429  hass: HomeAssistant,
430  connection: ActiveConnection,
431  msg: dict[str, Any],
432 ) -> None:
433  """Scan for new usb devices."""
434  usb_discovery: USBDiscovery = hass.data[DOMAIN]
435  if not usb_discovery.observer_active:
436  await usb_discovery.async_request_scan()
437  connection.send_result(msg["id"])
None __init__(self, HomeAssistant hass, list[USBMatcher] usb)
Definition: __init__.py:196
None async_start(self, Event event)
Definition: __init__.py:213
CALLBACK_TYPE async_register_initial_scan_callback(self, CALLBACK_TYPE callback)
Definition: __init__.py:305
None _device_discovered(self, Device device)
Definition: __init__.py:277
None _async_process_discovered_usb_device(self, USBDevice device)
Definition: __init__.py:321
MonitorObserver|None _get_monitor_observer(self)
Definition: __init__.py:244
None async_stop(self, Event event)
Definition: __init__.py:218
CALLBACK_TYPE async_register_scan_request_callback(self, CALLBACK_TYPE _callback)
Definition: __init__.py:291
None _async_process_ports(self, list[ListPortInfo] ports)
Definition: __init__.py:363
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
USBDevice usb_device_from_port(ListPortInfo port)
Definition: utils.py:10
bool async_is_plugged_in(HomeAssistant hass, USBCallbackMatcher matcher)
Definition: __init__.py:80
CALLBACK_TYPE async_register_initial_scan_callback(HomeAssistant hass, CALLBACK_TYPE callback)
Definition: __init__.py:70
None websocket_usb_scan(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:432
CALLBACK_TYPE async_register_scan_request_callback(HomeAssistant hass, CALLBACK_TYPE callback)
Definition: __init__.py:61
bool _is_matching(USBDevice device, USBMatcher|USBCallbackMatcher matcher)
Definition: __init__.py:168
str human_readable_device_name(str device, str|None serial_number, str|None manufacturer, str|None description, str|None vid, str|None pid)
Definition: __init__.py:126
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:150
bool _fnmatch_lower(str|None name, str pattern)
Definition: __init__.py:161
str get_serial_by_id(str dev_path)
Definition: __init__.py:138
list[USBMatcher] async_get_usb(HomeAssistant hass)
Definition: loader.py:545