Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bulk transfer async API #64

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
[package]
name = "rusb"
version = "0.8.0"
authors = ["David Cuddeback <[email protected]>", "Ilya Averyanov <[email protected]>"]
authors = [
"David Cuddeback <[email protected]>",
"Ilya Averyanov <[email protected]>",
]
description = "Rust library for accessing USB devices."
license = "MIT"
homepage = "https://github.com/a1ien/rusb"
Expand All @@ -15,14 +18,16 @@ build = "build.rs"
travis-ci = { repository = "a1ien/rusb" }

[features]
vendored = [ "libusb1-sys/vendored" ]
vendored = ["libusb1-sys/vendored"]

[workspace]
members = ["libusb1-sys"]

[dependencies]
libusb1-sys = { path = "libusb1-sys", version = "0.5.0" }
libc = "0.2"
log = "0.4"
thiserror = "1"

[dev-dependencies]
regex = "1"
46 changes: 46 additions & 0 deletions examples/read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use rusb::{AsyncTransfer, CbResult, Context, UsbContext};

use std::str::FromStr;
use std::time::Duration;

fn main() {
let args: Vec<String> = std::env::args().collect();

if args.len() < 4 {
eprintln!("Usage: read_async <vendor-id> <product-id> <endpoint>");
return;
}

let vid: u16 = FromStr::from_str(args[1].as_ref()).unwrap();
let pid: u16 = FromStr::from_str(args[2].as_ref()).unwrap();
let endpoint: u8 = FromStr::from_str(args[3].as_ref()).unwrap();

let ctx = Context::new().expect("Could not initialize libusb");
let device = ctx
.open_device_with_vid_pid(vid, pid)
.expect("Could not find device");

const NUM_TRANSFERS: usize = 32;
const BUF_SIZE: usize = 1024;

let mut transfers = Vec::new();
for _ in 0..NUM_TRANSFERS {
let mut transfer = AsyncTransfer::new_bulk(
&device,
endpoint,
BUF_SIZE,
callback,
Duration::from_secs(10),
);
transfer.submit().expect("Could not submit transfer");
transfers.push(transfer);
}

loop {
rusb::poll_transfers(&ctx, Duration::from_secs(10));
}
}

fn callback(result: CbResult) {
println!("{:?}", result)
}
20 changes: 4 additions & 16 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::{
config_descriptor::{self, ConfigDescriptor},
device_descriptor::{self, DeviceDescriptor},
device_handle::DeviceHandle,
error,
fields::{self, Speed},
Error, UsbContext,
error,
};

/// A reference to a USB device.
Expand Down Expand Up @@ -150,27 +150,16 @@ impl<T: UsbContext> Device<T> {
pub fn get_parent(&self) -> Option<Self> {
let device = unsafe { libusb_get_parent(self.device.as_ptr()) };
NonNull::new(device)
.map(|device|
unsafe {
Device::from_libusb(
self.context.clone(),
device,
)
}
)
.map(|device| unsafe { Device::from_libusb(self.context.clone(), device) })
}

/// Get the list of all port numbers from root for the specified device
pub fn port_numbers(&self) -> Result<Vec<u8>, Error> {
// As per the USB 3.0 specs, the current maximum limit for the depth is 7.
let mut ports = [0;7];
let mut ports = [0; 7];

let result = unsafe {
libusb_get_port_numbers(
self.device.as_ptr(),
ports.as_mut_ptr(),
ports.len() as i32
)
libusb_get_port_numbers(self.device.as_ptr(), ports.as_mut_ptr(), ports.len() as i32)
};

let ports_number = if result < 0 {
Expand All @@ -180,5 +169,4 @@ impl<T: UsbContext> Device<T> {
};
Ok(ports[0..ports_number as usize].to_vec())
}

}
218 changes: 218 additions & 0 deletions src/device_handle/async_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
use crate::{DeviceHandle, UsbContext};

use libc::c_void;
use libusb1_sys as ffi;
use thiserror::Error;

use std::convert::{TryFrom, TryInto};
use std::marker::{PhantomData, PhantomPinned};
use std::pin::Pin;
use std::ptr::NonNull;
use std::time::Duration;

pub type CbResult<'a> = Result<&'a [u8], TransferError>;

#[derive(Error, Debug)]
pub enum TransferError {
#[error("Transfer timed out")]
Timeout,
#[error("Transfer is stalled")]
Stall,
#[error("Device was disconnected")]
Disconnected,
#[error("Other Error: {0}")]
Other(&'static str),
#[error("{0}ERRNO: {1}")]
Errno(&'static str, i32),
}

pub struct AsyncTransfer<'d, C: UsbContext, F> {
ptr: NonNull<ffi::libusb_transfer>,
closure: F,
buffer: Box<[u8]>,
_pin: PhantomPinned, // `ptr` holds a ptr to `closure`, so mark !Unpin
_device: PhantomData<&'d DeviceHandle<C>>,
}
impl<'d, 'b, C: 'd + UsbContext, F: FnMut(CbResult<'b>) + Send> AsyncTransfer<'d, C, F> {
#[allow(unused)]
pub fn new_bulk(
device: &'d DeviceHandle<C>,
endpoint: u8,
buf_size: usize,
callback: F,
timeout: std::time::Duration,
) -> Pin<Box<Self>> {
// non-isochronous endpoints (e.g. control, bulk, interrupt) specify a value of 0
// This is step 1 of async API
let ptr = unsafe { ffi::libusb_alloc_transfer(0) };
let ptr = NonNull::new(ptr).expect("Could not allocate transfer!");
let timeout = libc::c_uint::try_from(timeout.as_millis())
.expect("Duration was too long to fit into a c_uint");

// Safety: Pinning `result` ensures it doesn't move, but we know that we will
// want to access its fields mutably, we just don't want its memory location
// changing (or its fields moving!). So routinely we will unsafely interact with
// its fields mutably through a shared reference, but this is still sound.
let result = Box::pin(Self {
ptr,
closure: callback,
buffer: vec![0u8; buf_size].into_boxed_slice(),
_pin: PhantomPinned,
_device: PhantomData,
});

unsafe {
// This casting, and passing it to the transfer struct, relies on
// the pointer being a regular pointer and not a fat pointer.
// Also, closure will be invoked from whatever thread polls, which
// may be different from the current thread. So it must be `Send`.
// Also, although many threads at once may poll concurrently, only
// one will actually ever execute the transfer at a time, so we do
// not need to worry about simultaneous writes to the buffer
let closure_as_ptr: *mut F = {
let ptr: *const F = &result.closure;
ptr as *mut F
};
// Step 2 of async api
ffi::libusb_fill_bulk_transfer(
ptr.as_ptr(),
device.as_raw(),
endpoint,
result.buffer.as_ptr() as *mut u8,
result.buffer.len().try_into().unwrap(),
Self::transfer_cb,
closure_as_ptr.cast(),
timeout,
)
};
result
}

/// Submits a transfer to libusb's event engine.
/// # Panics
/// A transfer should not be double-submitted! Only re-submit after a submission has
/// returned Err, or the callback has gotten an Err.
// Step 3 of async API
#[allow(unused)]
pub fn submit(self: &mut Pin<Box<Self>>) -> Result<(), TransferError> {
let errno = unsafe { ffi::libusb_submit_transfer(self.ptr.as_ptr()) };
use ffi::constants::*;
match errno {
0 => Ok(()),
LIBUSB_ERROR_BUSY => {
panic!("Do not double-submit a transfer!")
}
LIBUSB_ERROR_NOT_SUPPORTED => Err(TransferError::Other("Unsupported transfer!")),
LIBUSB_ERROR_INVALID_PARAM => Err(TransferError::Other("Transfer size too large!")),
LIBUSB_ERROR_NO_DEVICE => Err(TransferError::Disconnected),
_ => Err(TransferError::Errno("Unable to submit transfer. ", errno)),
}
}

// We need to invoke our closure using a c-style function, so we store the closure
// inside the custom user data field of the transfer struct, and then call the
// user provided closure from there.
// Step 4 of async API
extern "system" fn transfer_cb(transfer: *mut ffi::libusb_transfer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user-supplied callback panics, it's undefined behavior for it to propagate into C, so this needs a catch_unwind.

// Safety: libusb should never make this null, so this is fine
let transfer = unsafe { &mut *transfer };

// sanity
debug_assert_eq!(
transfer.transfer_type,
ffi::constants::LIBUSB_TRANSFER_TYPE_BULK
);

// sanity
debug_assert_eq!(
std::mem::size_of::<*mut F>(),
std::mem::size_of::<*mut c_void>(),
);
// Safety: The pointer shouldn't be a fat pointer, and should be valid, so
// this should be sound
let closure = unsafe {
let closure: *mut F = std::mem::transmute(transfer.user_data);
&mut *closure
};
Comment on lines +131 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libusb_cancel_transfer in the drop implementation requests cancellation asynchronously, but after drop returns, the AsyncTransfer memory may be invalidated before this callback is guaranteed to run (or complete, if already running on another thread). The callback may even still be called with a transfer completion after cancellation if it was in the middle of completing at the time of cancellation. Therefore this reference is not guaranteed to be valid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback may even still be called with a transfer completion after cancellation if it was in the middle of completing at the time of cancellation.
Are you saying this ordering is possible:

  1. Submit transfer
  2. Drop
  3. Callback informs of cancellation
  4. Callback informs of transfer completion

I wouldn't expect 4 to ever come before 3 - are you sure that is possible? If not possible, perhaps I could actually intentionally leak the struct with Box::leak, and then get the box back inside the cancellation callback and properly drop the components?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback should only get called once for a single transfer (until resubmitted), so you won't get both 3 and 4. What I'm not sure about is if you can get a LIBUSB_TRANSFER_COMPLETED callback after calling libusb_transfer_cancel if the transfer has already completed but the event loop has not called the callback yet, or whether this would still be LIBUSB_TRANSFER_CANCELLED but have nonzero actual_length.

I think deferring the drop to the callback could work, though I'd probably use ManuallyDrop over Box::leak to make it explicit.


use ffi::constants::*;
match transfer.status {
LIBUSB_TRANSFER_CANCELLED => {
// Step 5 of async API: Transfer was cancelled, free the transfer
unsafe { ffi::libusb_free_transfer(transfer) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only call to libusb_free_transfer. If the transfer has already completed on drop, the callback is never called with this status and the libusb_transfer is leaked.

}
LIBUSB_TRANSFER_COMPLETED => {
debug_assert!(transfer.length >= transfer.actual_length); // sanity
let data = unsafe {
std::slice::from_raw_parts(transfer.buffer, transfer.actual_length as usize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other safety consideration is that the transfer is no longer considered busy by libusb at the time of the callback, and the AsyncTransfer might be owned by another thread that could resubmit it during the execution of this callback or the user closure. That would illegally alias and mutate the transfer and buffer references here.

};
(*closure)(Ok(data));
}
LIBUSB_TRANSFER_ERROR => (*closure)(Err(TransferError::Other(
"Error occurred during transfer execution",
))),
LIBUSB_TRANSFER_TIMED_OUT => {
(*closure)(Err(TransferError::Timeout));
}
LIBUSB_TRANSFER_STALL => (*closure)(Err(TransferError::Stall)),
LIBUSB_TRANSFER_NO_DEVICE => (*closure)(Err(TransferError::Disconnected)),
LIBUSB_TRANSFER_OVERFLOW => unreachable!(),
_ => panic!("Found an unexpected error value for transfer status"),
}
}
}
impl<C: UsbContext, F> AsyncTransfer<'_, C, F> {
/// Helper function for the Drop impl.
fn drop_helper(self: Pin<&mut Self>) {
// Actual drop code goes here.
let transfer_ptr = self.ptr.as_ptr();
let errno = unsafe { ffi::libusb_cancel_transfer(transfer_ptr) };
match errno {
0 | ffi::constants::LIBUSB_ERROR_NOT_FOUND => (),
errno => {
log::warn!(
"Could not cancel USB transfer. Memory may be leaked. Errno: {}, Error message: {}",
errno, unsafe{std::ffi::CStr::from_ptr( ffi::libusb_strerror(errno))}.to_str().unwrap()
)
}
}
}
}

impl<C: UsbContext, F> Drop for AsyncTransfer<'_, C, F> {
fn drop(&mut self) {
// We call `drop_helper` because that function represents the actual semantics
// that `self` has when being dropped.
// (see https://doc.rust-lang.org/std/pin/index.html#drop-implementation)
// Safety: `new_unchecked` is okay because we know this value is never used
// again after being dropped.
Self::drop_helper(unsafe { Pin::new_unchecked(self) });
}
}

/// Polls for transfers and executes their callbacks. Will block until the
/// given timeout, or return immediately if timeout is zero.
/// Returns whether a transfer was completed
pub fn poll_transfers(ctx: &impl UsbContext, timeout: Duration) {
let timeval = libc::timeval {
tv_sec: timeout.as_secs().try_into().unwrap(),
tv_usec: timeout.subsec_millis().try_into().unwrap(),
};
unsafe {
let errno = ffi::libusb_handle_events_timeout_completed(
ctx.as_raw(),
std::ptr::addr_of!(timeval),
std::ptr::null_mut(),
);
use ffi::constants::*;
match errno {
0 => (),
LIBUSB_ERROR_INVALID_PARAM => panic!("Provided timeout was unexpectedly invalid"),
_ => panic!(
"Error when polling transfers. ERRNO: {}, Message: {}",
errno,
std::ffi::CStr::from_ptr(ffi::libusb_strerror(errno)).to_string_lossy()
),
}
}
}
2 changes: 2 additions & 0 deletions src/device_handle.rs → src/device_handle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod async_api;

use std::{mem, ptr::NonNull, time::Duration, u8};

use libc::{c_int, c_uchar, c_uint};
Expand Down
7 changes: 6 additions & 1 deletion src/device_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ impl<'a, T: UsbContext> Iterator for Devices<'a, T> {
let device = self.devices[self.index];

self.index += 1;
Some(unsafe { device::Device::from_libusb(self.context.clone(), std::ptr::NonNull::new_unchecked(device)) })
Some(unsafe {
device::Device::from_libusb(
self.context.clone(),
std::ptr::NonNull::new_unchecked(device),
)
})
} else {
None
}
Expand Down
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use crate::{
context::{Context, GlobalContext, Hotplug, LogLevel, Registration, UsbContext},
device::Device,
device_descriptor::DeviceDescriptor,
device_handle::async_api::{poll_transfers, AsyncTransfer, CbResult},
device_handle::DeviceHandle,
device_list::{DeviceList, Devices},
endpoint_descriptor::EndpointDescriptor,
Expand Down Expand Up @@ -106,6 +107,11 @@ pub fn open_device_with_vid_pid(
if handle.is_null() {
None
} else {
Some(unsafe { DeviceHandle::from_libusb(GlobalContext::default(), std::ptr::NonNull::new_unchecked(handle)) })
Some(unsafe {
DeviceHandle::from_libusb(
GlobalContext::default(),
std::ptr::NonNull::new_unchecked(handle),
)
})
}
}