Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support libusb async api #62

Open
TheButlah opened this issue May 5, 2021 · 12 comments
Open

Support libusb async api #62

TheButlah opened this issue May 5, 2021 · 12 comments

Comments

@TheButlah
Copy link

TheButlah commented May 5, 2021

Libusb provides an asynchronous api - for example, libusb_fill_bulk_transfer(), libusb_submit_transfer(), etc. Asynchronous API is important for getting good performance/latency from USB - in my case, my bulk transfers have a latency of 40-120ms, but if I were to be using the asynchronous API (which allows you to queue up multiple reads), my transfers would take only 4ms (!!!).

I know that some work on this is present in #48, and mentioned in #47. I wanted to open this issue as more of a feature tracker / feature request.

Note that integrating this with Rust's async functions is not necessarily necessary - it would be sufficient to stay consistent with libusb's approach where you have to repeatedly call libusb_handle_events() in a loop to drive the asynchronous callbacks in the same thread.

@kevinmehall
Copy link
Contributor

kevinmehall commented May 16, 2021

This is something I've been thinking about on-and-off for quite a while. Libusb's API is quite tricky to map to a safe and ergonomic Rust API. I've recently been designing an API that I haven't had a chance to implement yet that I want to put out there since there seems to be an increased interest in this lately.

I see two major use cases for async USB:

  1. Integrating with another event loop or Future / async / await: I think this is still blocked waiting for the Rust async ecosystem to mature further and answer questions on completion-based IO. USB on all operating systems is completion-based, and the buffer management runs into the same problems as other attempts to integrate Rust Future with completion-based IO like io_uring and embedded DMA. I hope the ecosystem can standardize on a solution here that can be adapted for USB, but it may involve new language features like linear types or async destructors and may be a long time coming. In my opinion, there's also not a huge motivation for this with USB -- unlike with network servers, you have a small and bounded number of endpoints and can easily launch a few dedicated threads to service them.

  2. Queuing multiple concurrent requests to keep the bus busy to maximize throughput: This technique is essential for high-performance + low-latency data streaming with USB. Everybody does basically the same thing with libusb here: submitting several transfers, and re-submitting them when they complete. An API that implements this pattern specifically seems much easier to make safe than trying to mimic libusb's API directly, and is likely also easier to use. This is what I'm personally most interested in, and what my proposal below addresses.

Here's a rough sketch of an API with a usage example:

pub struct Pipe<C: UsbContext> { ... }

impl<C: UsbContext> Pipe<C> {
    pub fn new(device_handle: DeviceHandle<C>, endpoint: u8) -> Pipe<C>;

    /// Submit a new transfer on the endpoint.
    /// 
    /// For an IN endpoint, the transfer size is set by the *capacity* of
    /// the buffer, and the length and current contents are ignored. The
    /// buffer is returned from a later call to `complete` filled with
    /// the data read from the endpoint.
    ///
    /// For an OUT endpoint, the contents of the buffer are written to
    /// the endpoint.
    pub fn submit(&mut self, buf: Vec<u8>) -> Result<(), TransferError>;

    /// Block waiting for the next pending transfer to complete, and return
    /// its buffer or an error status.
    ///
    /// For an IN endpoint, the returned buffer contains the data
    /// read from the device.
    ///
    /// For an OUT endpoint, the buffer is unmodified, but can be
    /// reused for another transfer.
    pub fn complete(&mut self) -> Result<Vec<u8>, TransferError>;

    /// Get the number of transfers that have been submitted with
    /// `submit` that have not yet been returned from `complete`.
    pub fn pending_transfers(&self) -> usize;

    /// Get the number of transfers that have completed and are
    /// ready to be returned from `complete` without blocking.
    pub fn ready_transfers(&self) -> usize;
}

fn read_example(device_handle: DeviceHandle<C>) -> Result<(), anyhow::Error> {
    let pipe = Pipe::new(device_handle, 0x81);

    while pipe.pending_transfers() < 8 {
        pipe.submit(Vec::with_capacity(4096));
    }

    loop {
        let buf = pipe.complete()?;
        process_data(&buf);
        pipe.submit(buf);
    }
}

fn write_example(device_handle: DeviceHandle<C>) -> Result<(), anyhow::Error> {
    let pipe = Pipe::new(device_handle, 0x02);

    loop {
        let mut buf = if pipe.pending_transfers() < 8 {
            Vec::with_capacity(4096)
        } else {
            pipe.complete()?
        };

        fill_data(&mut buf);

        pipe.submit(buf);
    }
}

I'm not sure if this belongs in rusb proper, or in its own library with #57. I'd be happy to help someone implement as I can't guarantee when I'll get around to doing it myself.

@TheButlah
Copy link
Author

TheButlah commented May 16, 2021

I can't comment on whether it should belong in rusb or someplace else, but I do have a few thoughts:

  1. I like the concept of having a pipe or queue that you submit transfers to, and poll from (naming bikeshed aside)
  2. I'm not sure about the value of seeing pending and ready transfer counts. Seems prone to encouraging race conditions and might just be better to be hidden info. What would be a use case for this?
  3. Supporting a timeout in the complete/poll function would be nice, it lets us consolidate both blocking and nonblocking APIs (and seems to be trivial to implement with `libusb_handle_events_timeout())
  4. Instead of submitting and receiving Vecs, what if we submitted and received Transfer objects, which the user can access the [u8] in. This would let us avoid allocing and filling transfers repeatedly, since the user could resubmit the returned transfer object after reading data. To allow for zero copy, we could also add a Transfer::replace_buf() method on the struct that exchanges the vec in the transfer with another vec, returning the original (like std::mem::replace does)
  5. Can you clarify how complete() retrieves the latest failed/sucecessful transfer, when the transfers are driven by callbacks? How is this done in a thread-safe way? Does the Pipe have a mutex for the latest transfer, that gets set by the callback?

@kevinmehall
Copy link
Contributor

  1. Names definitely up for bikeshedding!

  2. pending_transfers is only changed through the &mut methods, not from the internal callback, so no race condition concerns there (this is just .len() on the VecDeque described below). That one is useful as shown in the examples to continue submitting transfers up to a specified number without the caller having to keep track.
    The use case for ready_transfers is to know if complete would block, but I'm not sure it's worth it to include.

  3. Yes, complete could use a timeout arg.

  4. To avoid re-allocating transfers, it could hold on to one or more completed transfers for re-use. It only needs to cache one transfer for the streaming use case in the example code.
    I'm not opposed to a Transfer type, but would like to avoid the additional complexity if possible. Supporting isochronous may justify it.

  5. Implementation-wise my plan was that it would keep a VecDeque of transfers that is pushed to by submit and popped by complete. This would only be manipulated by the &mut methods, so no synchronization needed. complete would use libusb_handle_events_timeout_completed to block and handle events until a flag associated with the next transfer in the queue is set by its completion callback, very much like libusb's sync API does internally. Looking at that now, though, I'm not sure how they justify the use of a regular variable without locking for the completed flag there -- maybe drop one level deeper and use an AtomicBool for that flag.

@TheButlah
Copy link
Author

That last part seems to be the trickiest part and the part of libusb I understand the least - for now I'm going to see how far I can get with a Mutex<VecDeque> for completed

@TheButlah
Copy link
Author

TheButlah commented May 16, 2021

As for caching the transfers, we could just hold the latest transfer that we stole the vec from and gave to the user. For new transfers, we can create them if the cached transfer is absent. But how would the user exert control over the maximum/minimum number of concurrent transfers? Is it even necessary for them to have that control? Every tutorial on the async api makes like 32 transfers and submits them, and them reuses them

@TheButlah
Copy link
Author

I opted for making the number of transfers more explicit and declared up front, and since we don't really push anything into the queue, this is more of a pool concept I think. So I opened #65 and am fairly happy with the API, its definitely an improvement over the callback oriented approach

@TheButlah
Copy link
Author

My coworker just tested out #65 with an oscilloscope, and the latency is 65ms, but appears to fluctuate in the same range as the original bulk api. Theres a lot of additional code that builds on that PR that could confound the performance, so I'm not sure if the problem lies in the PR or the other device-specific decoder code I wrote. Does anyone know good ways of measuring USB latency without an oscilloscope, or other ways I could check the performance of the system?

@kevinmehall
Copy link
Contributor

What exactly are they measuring with the oscilloscope? Max time between packets? Round-trip for the app to respond to an IN packet with an OUT packet?

You could measure round-trip latency without an oscilloscope by making a device that echos data back to the PC and using this API to write packets containing the current time and see how long they take to come back. Or find the time between packets on an IN endpoint by having the device write timestamps to them.

You're building this with --release, right?

@TheButlah
Copy link
Author

TheButlah commented May 22, 2021

We are measuring a bunch of stuff that builds on top of rusb - basically flashing a light, then reading from a camera via async usb, then decoding/parsing that data into packets, then aggregating that data into frames, all while running in multiple threads with concurrent data structures. So there is a lot here that could be confounding variables, and I need to figure out a better way to measure the latency that would be basically just use rusb and not the additional decoder and data pipeline I built.

Its possible my coworker forgot --release, I'll try it myself on monday

@teague-lasser
Copy link

I needed to build out an asynchronous USB implementation, I don't know if there is still desire for this in rUSB but I'm happy with the result and it seems to work in my tests. Here is the basic idea with a read transfer, writes are very similar.

The drop won't get called on InnerTransfer until after the Arc::from_raw gets called from within the callback since that pointer is otherwise dangling, so we can ensure that the libusb_transfer object doesn't get freed until after the callback has fired. Holding on to the context inside of the transfer also means all the callbacks will need to execute before the context is freed. The only thing here that's iffy for me is that libusb requires you to poll its events to make progress on callbacks. Right now I just do that in a separate thread.

use std::ffi::c_void;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use libc::{c_int, c_uint};
use libusb1_sys::constants::*;
use libusb1_sys::*;
use rusb::{
    Context as UsbContext, DeviceHandle, Result as RusbResult, UsbContext as UsbContextTrait,
};

const LIBUSB_TRANSFER_ACTIVE: c_int = -1;

struct InnerTransfer {
    transfer: NonNull<libusb_transfer>,
    _context: UsbContext, // Makes sure context is not dropped while a transfer is still alive
    status: c_int,
    actual_length: c_int,
    waker: Option<Waker>,
}

impl InnerTransfer {
    pub fn new(context: UsbContext) -> Self {
        Self {
            transfer: unsafe { NonNull::new_unchecked(libusb_alloc_transfer(0)) },
            _context: context,
            status: LIBUSB_TRANSFER_ACTIVE,
            actual_length: -1,
            waker: None,
        }
    }

    pub fn as_ptr(&mut self) -> *mut libusb_transfer {
        self.transfer.as_ptr()
    }
}

impl Drop for InnerTransfer {
    fn drop(&mut self) {
        unsafe { libusb_free_transfer(self.transfer.as_ptr()) };
    }
}

// Private in rusb or reimplemented
fn check_transfer_error(status: c_int) -> RusbResult<()> {
    if status < 0 {
        Err(match status {
            LIBUSB_ERROR_NO_DEVICE => rusb::Error::NoDevice,
            LIBUSB_ERROR_BUSY => rusb::Error::Busy,
            LIBUSB_ERROR_NOT_SUPPORTED => rusb::Error::NotSupported,
            LIBUSB_ERROR_INVALID_PARAM => rusb::Error::InvalidParam,
            _ => rusb::Error::Other,
        })
    } else {
        Ok(())
    }
}

#[no_mangle]
extern "system" fn transfer_finished(transfer_ptr: *mut libusb_transfer) {
    unsafe {
        let transfer: &mut libusb_transfer = &mut *transfer_ptr;
        let user_data = transfer.user_data;
        if !user_data.is_null() {
            let inner = Arc::from_raw(user_data as *mut Mutex<InnerTransfer>);
            let mut inner = inner.lock().unwrap();

            inner.status = transfer.status;
            inner.actual_length = transfer.actual_length;

            if let Some(waker) = inner.waker.take() {
                waker.wake()
            }
        }
    }
}

pub struct ReadTransfer {
    inner: Arc<Mutex<InnerTransfer>>,
    context: UsbContext,
    buffer: Pin<Box<[u8]>>,
}

impl Drop for ReadTransfer {
    fn drop(&mut self) {
        let mut inner = self.inner.lock().unwrap();
        let ptr = inner.as_ptr();
        inner.waker = None;
        if inner.status == LIBUSB_TRANSFER_ACTIVE {
            unsafe {
                libusb_cancel_transfer(ptr);
            }
        }
    }
}

impl Future for ReadTransfer {
    type Output = RusbResult<Vec<u8>>;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Err(err) = self.context.handle_events(Some(Duration::from_micros(0))) {
            return Poll::Ready(Err(err));
        }

        let mut inner = self.inner.lock().unwrap();
        if inner.status == LIBUSB_TRANSFER_COMPLETED {
            return Poll::Ready(Ok(self.buffer[..inner.actual_length as usize].to_vec()));
        } else if inner.status != LIBUSB_TRANSFER_ACTIVE {
            let error = match inner.status {
                LIBUSB_TRANSFER_TIMED_OUT => rusb::Error::Timeout,
                LIBUSB_TRANSFER_CANCELLED => rusb::Error::Interrupted,
                _ => rusb::Error::Other,
            };
            return Poll::Ready(Err(error));
        }
        inner.waker = Some(ctx.waker().clone());

        Poll::Pending
    }
}

impl ReadTransfer {
    pub fn new(
        device: &DeviceHandle<UsbContext>,
        endpoint: u8,
        max_size: usize,
        timeout: Duration,
    ) -> RusbResult<Self> {
        let context = device.context().clone();
        let device = unsafe { NonNull::new_unchecked(device.as_raw()) };

        if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN {
            return Err(rusb::Error::InvalidParam);
        }

        let mut buffer: Pin<Box<[u8]>> = vec![0; max_size].into_boxed_slice().into();
        let timeout = timeout.as_millis() as c_uint;

        let mut inner = InnerTransfer::new(context.clone());
        let transfer_ptr = inner.as_ptr();
        let transfer = Arc::new(Mutex::new(inner));

        let result = unsafe {
            let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void;
            let buffer: *mut u8 = buffer.as_mut_ptr();

            libusb_fill_bulk_transfer(
                transfer_ptr,
                device.as_ptr(),
                endpoint,
                buffer,
                max_size as i32,
                transfer_finished as libusb_transfer_cb_fn,
                state_ptr,
                timeout,
            );
            libusb_submit_transfer(transfer_ptr)
        };
        check_transfer_error(result)?;
        Ok(Self {
            inner: transfer,
            context,
            buffer,
        })
    }
}

@mxk
Copy link

mxk commented May 1, 2023

FWIW, I also wrote an async API using an event thread on top of rusb for my Bluetooth library: https://github.com/BlackrockNeurotech/burble/blob/8289cf0fc44087d483af11e06f3e8ed4fab235df/src/host/usb.rs#L414-L764

Ideally, I'd want to get rid of the event thread on non-Windows systems where libusb supports the poll API. The implementation is also a little specialized for my use case. Apart from that, it's been working well for me and may be helpful in providing ideas for the official rusb API.

@ia0
Copy link

ia0 commented Jul 18, 2023

Having access to a Rust version of libusb_submit_transfer and other libusb_transfer-related functions would also permit to use libusb_transfer_flags like LIBUSB_TRANSFER_ADD_ZERO_PACKET which is useful. I don't think it's currently possible to use write_bulk in a way that adds a zero-length packet if the data length is a multiple of the maximum packet size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants