Skip to content

Commit

Permalink
Try getting rid of libusbsio
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Mar 22, 2024
1 parent 7450c7e commit 82b1fcb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
from typing import Dict, List, Optional

import libusbsio
import hid
from typing_extensions import Self

from ....exceptions import SPSDKConnectionError, SPSDKError
Expand All @@ -36,7 +36,6 @@ def __init__(
timeout: Optional[int] = None,
) -> None:
"""Initialize the USB interface object."""
self._opened = False
self.vid = vid or 0
self.pid = pid or 0
self.path = path or b""
Expand All @@ -45,10 +44,7 @@ def __init__(
self.product_name = product_name or ""
self.interface_number = interface_number or 0
self._timeout = timeout or 2000
libusbsio_logger = logging.getLogger("libusbsio")
self._device: libusbsio.LIBUSBSIO.HID_DEVICE = libusbsio.usbsio(
loglevel=libusbsio_logger.getEffectiveLevel()
).HIDAPI_DeviceCreate()
self._device: Optional[hid.device] = None

@property
def timeout(self) -> int:
Expand All @@ -66,7 +62,7 @@ def is_opened(self) -> bool:
:return: True if device is open, False othervise.
"""
return self._opened
return self._device is not None

def open(self) -> None:
"""Open the interface.
Expand All @@ -75,12 +71,11 @@ def open(self) -> None:
:raises SPSDKConnectionError: if the device can not be opened
"""
logger.debug(f"Opening the Interface: {str(self)}")
if self.is_opened:
if self._device:
# This would get HID_DEVICE into broken state
raise SPSDKError("Can't open already opened device")
try:
self._device.Open(self.path)
self._opened = True
self._device = hid.device.open_path(self.path)
except Exception as error:
raise SPSDKConnectionError(
f"Unable to open device '{str(self)}'"
Expand All @@ -93,10 +88,9 @@ def close(self) -> None:
:raises SPSDKConnectionError: if the device can not be opened
"""
logger.debug(f"Closing the Interface: {str(self)}")
if self.is_opened:
if self._device:
try:
self._device.Close()
self._opened = False
self._device.close()
except Exception as error:
raise SPSDKConnectionError(
f"Unable to close device '{str(self)}'"
Expand All @@ -112,16 +106,16 @@ def read(self, length: int, timeout: Optional[int] = None) -> bytes:
:raises SPSDKTimeoutError: Time-out
"""
timeout = timeout or self.timeout
if not self.is_opened:
if not self._device:
raise SPSDKConnectionError("Device is not opened for reading")
try:
(data, result) = self._device.Read(length, timeout_ms=timeout)
data = self._device.read(length, timeout_ms=timeout)
except Exception as e:
raise SPSDKConnectionError(str(e)) from e
if not data:
logger.error(f"Cannot read from HID device, error={result}")
logger.error(f"Cannot read from HID device")
raise SPSDKTimeoutError()
return data
return bytes(data)

def write(self, data: bytes, timeout: Optional[int] = None) -> None:
"""Send data to device.
Expand All @@ -131,10 +125,10 @@ def write(self, data: bytes, timeout: Optional[int] = None) -> None:
:raises SPSDKConnectionError: Sending data to device failure
"""
timeout = timeout or self.timeout
if not self.is_opened:
if not self._device:
raise SPSDKConnectionError("Device is not opened for writing")
try:
bytes_written = self._device.Write(data, timeout_ms=timeout)
bytes_written = self._device.write(data, timeout_ms=timeout)
except Exception as e:
raise SPSDKConnectionError(str(e)) from e
if bytes_written < 0 or bytes_written < len(data):
Expand Down Expand Up @@ -194,13 +188,10 @@ def enumerate(
:return: List of interfaces found
"""
devices = []
libusbsio_logger = logging.getLogger("libusbsio")
sio = libusbsio.usbsio(loglevel=libusbsio_logger.getEffectiveLevel())
all_hid_devices = sio.HIDAPI_Enumerate()

# iterate on all devices found
for dev in all_hid_devices:
if usb_device_filter.compare(vars(dev)) is True:
for dev in hid.enumerate():
if usb_device_filter.compare(dev) is True:
new_device = cls(
vid=dev["vendor_id"],
pid=dev["product_id"],
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ dependencies = [
"protobuf >=3.17.3, < 4.0.0",
"click-aliases",
"semver",
"libusbsio",
"importlib_metadata",
"hidapi",
"nethsm >= 1.0.0,<2",
"asn1tools >= 0.166.0",
"pyyaml >= 6.0.1",
Expand Down Expand Up @@ -147,6 +148,8 @@ module = [
"fastjsonschema",
"deepmerge.*",
"crcmod.*",
"hid.*",
"importlib_metadata.*",
]
ignore_missing_imports = true

Expand Down

0 comments on commit 82b1fcb

Please sign in to comment.