diff --git a/pynitrokey/trussed/bootloader/lpc55_upload/utils/interfaces/device/usb_device.py b/pynitrokey/trussed/bootloader/lpc55_upload/utils/interfaces/device/usb_device.py index afbb86cc..2ae2112b 100644 --- a/pynitrokey/trussed/bootloader/lpc55_upload/utils/interfaces/device/usb_device.py +++ b/pynitrokey/trussed/bootloader/lpc55_upload/utils/interfaces/device/usb_device.py @@ -9,7 +9,7 @@ import logging from typing import Dict, List, Optional -import libusbsio +import hid from typing_extensions import Self from ....exceptions import SPSDKConnectionError, SPSDKError @@ -36,7 +36,6 @@ def __init__( timeout: Optional[int] = None, ) -> None: """Initialize the USB interface object.""" - self._opened = False self.vid = vid or 0 self.pid = pid or 0 self.path = path or b"" @@ -45,10 +44,7 @@ def __init__( self.product_name = product_name or "" self.interface_number = interface_number or 0 self._timeout = timeout or 2000 - libusbsio_logger = logging.getLogger("libusbsio") - self._device: libusbsio.LIBUSBSIO.HID_DEVICE = libusbsio.usbsio( - loglevel=libusbsio_logger.getEffectiveLevel() - ).HIDAPI_DeviceCreate() + self._device: Optional[hid.device] = None @property def timeout(self) -> int: @@ -66,7 +62,7 @@ def is_opened(self) -> bool: :return: True if device is open, False othervise. """ - return self._opened + return self._device is not None def open(self) -> None: """Open the interface. @@ -75,12 +71,11 @@ def open(self) -> None: :raises SPSDKConnectionError: if the device can not be opened """ logger.debug(f"Opening the Interface: {str(self)}") - if self.is_opened: + if self._device: # This would get HID_DEVICE into broken state raise SPSDKError("Can't open already opened device") try: - self._device.Open(self.path) - self._opened = True + self._device = hid.device.open_path(self.path) except Exception as error: raise SPSDKConnectionError( f"Unable to open device '{str(self)}'" @@ -93,10 +88,9 @@ def close(self) -> None: :raises SPSDKConnectionError: if the device can not be opened """ logger.debug(f"Closing the Interface: {str(self)}") - if self.is_opened: + if self._device: try: - self._device.Close() - self._opened = False + self._device.close() except Exception as error: raise SPSDKConnectionError( f"Unable to close device '{str(self)}'" @@ -112,16 +106,16 @@ def read(self, length: int, timeout: Optional[int] = None) -> bytes: :raises SPSDKTimeoutError: Time-out """ timeout = timeout or self.timeout - if not self.is_opened: + if not self._device: raise SPSDKConnectionError("Device is not opened for reading") try: - (data, result) = self._device.Read(length, timeout_ms=timeout) + data = self._device.read(length, timeout_ms=timeout) except Exception as e: raise SPSDKConnectionError(str(e)) from e if not data: - logger.error(f"Cannot read from HID device, error={result}") + logger.error(f"Cannot read from HID device") raise SPSDKTimeoutError() - return data + return bytes(data) def write(self, data: bytes, timeout: Optional[int] = None) -> None: """Send data to device. @@ -131,10 +125,10 @@ def write(self, data: bytes, timeout: Optional[int] = None) -> None: :raises SPSDKConnectionError: Sending data to device failure """ timeout = timeout or self.timeout - if not self.is_opened: + if not self._device: raise SPSDKConnectionError("Device is not opened for writing") try: - bytes_written = self._device.Write(data, timeout_ms=timeout) + bytes_written = self._device.write(data, timeout_ms=timeout) except Exception as e: raise SPSDKConnectionError(str(e)) from e if bytes_written < 0 or bytes_written < len(data): @@ -194,13 +188,10 @@ def enumerate( :return: List of interfaces found """ devices = [] - libusbsio_logger = logging.getLogger("libusbsio") - sio = libusbsio.usbsio(loglevel=libusbsio_logger.getEffectiveLevel()) - all_hid_devices = sio.HIDAPI_Enumerate() # iterate on all devices found - for dev in all_hid_devices: - if usb_device_filter.compare(vars(dev)) is True: + for dev in hid.enumerate(): + if usb_device_filter.compare(dev) is True: new_device = cls( vid=dev["vendor_id"], pid=dev["product_id"], diff --git a/pyproject.toml b/pyproject.toml index 0033eb9d..7e93c66a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,8 @@ dependencies = [ "protobuf >=3.17.3, < 4.0.0", "click-aliases", "semver", - "libusbsio", + "importlib_metadata", + "hidapi", "nethsm >= 1.0.0,<2", "asn1tools >= 0.166.0", "pyyaml >= 6.0.1", @@ -147,6 +148,8 @@ module = [ "fastjsonschema", "deepmerge.*", "crcmod.*", + "hid.*", + "importlib_metadata.*", ] ignore_missing_imports = true