1 """The USB Discovery integration."""
3 from __future__
import annotations
5 from collections.abc
import Coroutine
11 from typing
import TYPE_CHECKING, Any
13 from serial.tools.list_ports
import comports
14 from serial.tools.list_ports_common
import ListPortInfo
15 import voluptuous
as vol
17 from homeassistant
import config_entries
25 callback
as hass_callback,
33 from .const
import DOMAIN
34 from .models
import USBDevice
35 from .utils
import usb_device_from_port
38 from pyudev
import Device, MonitorObserver
40 _LOGGER = logging.getLogger(__name__)
42 REQUEST_SCAN_COOLDOWN = 60
45 "async_is_plugged_in",
46 "async_register_scan_request_callback",
51 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
55 """Callback matcher for the USB integration."""
60 hass: HomeAssistant, callback: CALLBACK_TYPE
62 """Register to receive a callback when a scan should be initiated."""
63 discovery: USBDiscovery = hass.data[DOMAIN]
64 return discovery.async_register_scan_request_callback(callback)
69 hass: HomeAssistant, callback: CALLBACK_TYPE
71 """Register to receive a callback when the initial USB scan is done.
73 If the initial scan is already done, the callback is called immediately.
75 discovery: USBDiscovery = hass.data[DOMAIN]
76 return discovery.async_register_initial_scan_callback(callback)
81 """Return True is a USB device is present."""
83 vid = matcher.get(
"vid",
"")
84 pid = matcher.get(
"pid",
"")
85 serial_number = matcher.get(
"serial_number",
"")
86 manufacturer = matcher.get(
"manufacturer",
"")
87 description = matcher.get(
"description",
"")
92 or serial_number != serial_number.lower()
93 or manufacturer != manufacturer.lower()
94 or description != description.lower()
97 f
"vid and pid must be uppercase, the rest lowercase in matcher {matcher!r}"
100 usb_discovery: USBDiscovery = hass.data[DOMAIN]
103 for device_tuple
in usb_discovery.seen
107 @dataclasses.dataclass(slots=True)
109 """Prepared info from usb entries."""
114 serial_number: str |
None
115 manufacturer: str |
None
116 description: str |
None
121 serial_number: str |
None,
122 manufacturer: str |
None,
123 description: str |
None,
127 """Return a human readable name from USBDevice attributes."""
128 device_details = f
"{device}, s/n: {serial_number or 'n/a'}"
129 manufacturer_details = f
" - {manufacturer}" if manufacturer
else ""
130 vendor_details = f
" - {vid}:{pid}" if vid
else ""
131 full_details = f
"{device_details}{manufacturer_details}{vendor_details}"
135 return f
"{description[:26]} - {full_details}"
139 """Return a /dev/serial/by-id match for given device if available."""
140 by_id =
"/dev/serial/by-id"
141 if not os.path.isdir(by_id):
144 for path
in (entry.path
for entry
in os.scandir(by_id)
if entry.is_symlink()):
145 if os.path.realpath(path) == dev_path:
150 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
151 """Set up the USB Discovery integration."""
154 await usb_discovery.async_setup()
155 hass.data[DOMAIN] = usb_discovery
156 websocket_api.async_register_command(hass, websocket_usb_scan)
162 """Match a lowercase version of the name."""
165 return fnmatch.fnmatch(name.lower(), pattern)
168 def _is_matching(device: USBDevice, matcher: USBMatcher | USBCallbackMatcher) -> bool:
169 """Return True if a device matches."""
170 if "vid" in matcher
and device.vid != matcher[
"vid"]:
172 if "pid" in matcher
and device.pid != matcher[
"pid"]:
175 device.serial_number, matcher[
"serial_number"]
179 device.manufacturer, matcher[
"manufacturer"]
183 device.description, matcher[
"description"]
190 """Manage USB Discovery."""
195 usb: list[USBMatcher],
197 """Init USB Discovery."""
200 self.seen: set[tuple[str, ...]] = set()
202 self.
_request_debouncer_request_debouncer: Debouncer[Coroutine[Any, Any,
None]] |
None =
None
203 self._request_callbacks: list[CALLBACK_TYPE] = []
205 self._initial_scan_callbacks: list[CALLBACK_TYPE] = []
208 """Set up USB Discovery."""
210 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self.
async_startasync_start)
211 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.
async_stopasync_stop)
214 """Start USB Discovery and run a manual scan."""
219 """Stop USB Discovery."""
224 """Start monitoring hardware with pyudev."""
225 if not sys.platform.startswith(
"linux"):
227 info = await system_info.async_get_system_info(self.
hasshass)
228 if info.get(
"docker"):
232 observer := await self.
hasshass.async_add_executor_job(
238 def _stop_observer(event: Event) ->
None:
241 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_observer)
245 """Get the monitor observer.
247 This runs in the executor because the import
258 except (ImportError, OSError):
261 monitor = Monitor.from_netlink(context)
263 monitor.filter_by(subsystem=
"tty")
264 except ValueError
as ex:
266 "Unable to setup pyudev filtering; This is expected on WSL: %s", ex
270 observer = MonitorObserver(
278 """Call when the observer discovers a new usb tty device."""
279 if device.action !=
"add":
282 "Discovered Device at path: %s, triggering scan serial",
290 _callback: CALLBACK_TYPE,
292 """Register a scan request callback."""
293 self._request_callbacks.append(_callback)
296 def _async_remove_callback() -> None:
297 self._request_callbacks.
remove(_callback)
299 return _async_remove_callback
304 callback: CALLBACK_TYPE,
306 """Register an initial scan callback."""
311 self._initial_scan_callbacks.append(callback)
314 def _async_remove_callback() -> None:
315 if callback
not in self._initial_scan_callbacks:
317 self._initial_scan_callbacks.
remove(callback)
319 return _async_remove_callback
322 """Process a USB discovery."""
323 _LOGGER.debug(
"Discovered USB Device: %s", device)
324 device_tuple = dataclasses.astuple(device)
325 if device_tuple
in self.seen:
327 self.seen.
add(device_tuple)
329 matched = [matcher
for matcher
in self.
usbusb
if _is_matching(device, matcher)]
333 service_info: UsbServiceInfo |
None =
None
335 sorted_by_most_targeted = sorted(matched, key=
lambda item: -len(item))
336 most_matched_fields = len(sorted_by_most_targeted[0])
338 for matcher
in sorted_by_most_targeted:
341 if len(matcher) < most_matched_fields:
344 if service_info
is None:
346 device=await self.
hasshass.async_add_executor_job(
347 get_serial_by_id, device.device
351 serial_number=device.serial_number,
352 manufacturer=device.manufacturer,
353 description=device.description,
356 discovery_flow.async_create_flow(
359 {
"source": config_entries.SOURCE_USB},
364 """Process each discovered port."""
368 if port.vid
is not None or port.pid
is not None
373 if sys.platform ==
"darwin":
376 for dev
in usb_devices
377 if dev.device.startswith(
"/dev/cu.SLAB_USBtoUART")
382 for dev
in usb_devices
383 if dev.serial_number
not in silabs_serials
385 dev.serial_number
in silabs_serials
386 and dev.device.startswith(
"/dev/cu.SLAB_USBtoUART")
390 for usb_device
in usb_devices:
394 """Scan serial ports."""
396 await self.
hasshass.async_add_executor_job(comports)
402 while self._initial_scan_callbacks:
403 self._initial_scan_callbacks.pop()()
406 """Scan for USB devices and notify callbacks to scan as well."""
407 for callback
in self._request_callbacks:
412 """Request a serial scan."""
417 cooldown=REQUEST_SCAN_COOLDOWN,
425 @websocket_api.require_admin
426 @websocket_api.websocket_command({vol.Required("type"):
"usb/scan"})
427 @websocket_api.async_response
430 connection: ActiveConnection,
433 """Scan for new usb devices."""
434 usb_discovery: USBDiscovery = hass.data[DOMAIN]
435 if not usb_discovery.observer_active:
436 await usb_discovery.async_request_scan()
437 connection.send_result(msg[
"id"])
None _async_scan_serial(self)
None __init__(self, HomeAssistant hass, list[USBMatcher] usb)
None async_start(self, Event event)
CALLBACK_TYPE async_register_initial_scan_callback(self, CALLBACK_TYPE callback)
None async_request_scan(self)
None _async_start_monitor(self)
None _device_discovered(self, Device device)
None _async_process_discovered_usb_device(self, USBDevice device)
MonitorObserver|None _get_monitor_observer(self)
None async_stop(self, Event event)
CALLBACK_TYPE async_register_scan_request_callback(self, CALLBACK_TYPE _callback)
None _async_process_ports(self, list[ListPortInfo] ports)
bool add(self, _T matcher)
bool remove(self, _T matcher)
USBDevice usb_device_from_port(ListPortInfo port)
bool async_is_plugged_in(HomeAssistant hass, USBCallbackMatcher matcher)
CALLBACK_TYPE async_register_initial_scan_callback(HomeAssistant hass, CALLBACK_TYPE callback)
None websocket_usb_scan(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
CALLBACK_TYPE async_register_scan_request_callback(HomeAssistant hass, CALLBACK_TYPE callback)
bool _is_matching(USBDevice device, USBMatcher|USBCallbackMatcher matcher)
str human_readable_device_name(str device, str|None serial_number, str|None manufacturer, str|None description, str|None vid, str|None pid)
bool async_setup(HomeAssistant hass, ConfigType config)
bool _fnmatch_lower(str|None name, str pattern)
str get_serial_by_id(str dev_path)
list[USBMatcher] async_get_usb(HomeAssistant hass)