Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DropWaker SubmissionQueue::cancel_op #139

Merged
merged 5 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/drop_waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! [`DropWake`] trait and implementations.
//!
//! See [`drop_task_waker`].

use std::cell::UnsafeCell;
use std::ffi::CString;
use std::ptr;
use std::sync::Arc;
use std::task;

/// Create a [`task::Waker`] that will drop itself when the waker is dropped.
///
/// # Safety
///
/// The returned `task::Waker` cannot be cloned, it will panic.
pub(crate) unsafe fn drop_task_waker<T: DropWake>(to_drop: T) -> task::Waker {
unsafe fn drop_by_ptr<T: DropWake>(data: *const ()) {
T::drop_from_waker_data(data);
}

// SAFETY: we meet the `task::Waker` and `task::RawWaker` requirements.
unsafe {
task::Waker::from_raw(task::RawWaker::new(
to_drop.into_waker_data(),
&task::RawWakerVTable::new(
|_| panic!("attempted to clone `a10::drop_task_waker`"),
// SAFETY: `wake` takes ownership, so dropping is safe.
drop_by_ptr::<T>,
|_| { /* `wake_by_ref` is a no-op. */ },
drop_by_ptr::<T>,
),
))
}
}

/// Trait used by [`drop_task_waker`].
pub(crate) trait DropWake {
/// Return itself as waker data.
fn into_waker_data(self) -> *const ();

/// Drop the waker `data` created by `into_waker_data`.
unsafe fn drop_from_waker_data(data: *const ());
}

impl<T> DropWake for UnsafeCell<T>
where
T: DropWake,
{
fn into_waker_data(self) -> *const () {
self.into_inner().into_waker_data()
}

unsafe fn drop_from_waker_data(data: *const ()) {
T::drop_from_waker_data(data);
}
}

impl<T> DropWake for (T,)
where
T: DropWake,
{
fn into_waker_data(self) -> *const () {
self.0.into_waker_data()
}

unsafe fn drop_from_waker_data(data: *const ()) {
T::drop_from_waker_data(data);
}
}

impl DropWake for () {
fn into_waker_data(self) -> *const () {
ptr::null()
}

unsafe fn drop_from_waker_data(_: *const ()) {
// Nothing.
}
}

impl<T> DropWake for Box<T> {
fn into_waker_data(self) -> *const () {
Box::into_raw(self).cast()
}

unsafe fn drop_from_waker_data(data: *const ()) {
drop(Box::<T>::from_raw(data.cast_mut().cast()));
}
}

impl<T> DropWake for Arc<T> {
fn into_waker_data(self) -> *const () {
Arc::into_raw(self).cast()
}

unsafe fn drop_from_waker_data(data: *const ()) {
drop(Arc::<T>::from_raw(data.cast_mut().cast()));
}
}

impl DropWake for CString {
fn into_waker_data(self) -> *const () {
CString::into_raw(self).cast()
}

unsafe fn drop_from_waker_data(data: *const ()) {
drop(CString::from_raw(data.cast_mut().cast()));
}
}
58 changes: 36 additions & 22 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,16 @@ impl<D: Descriptor> Drop for Open<D> {
// `None`, but in that case `self.path` would also be
// `None`.
let sq = self.sq.as_ref().unwrap();
let result = sq.cancel_op(op_index, path, |submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
});
let result = sq.cancel_op(
op_index,
|| path,
|submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!(
"dropped a10::Open before completion, attempt to cancel failed: {err}"
Expand Down Expand Up @@ -811,12 +815,16 @@ impl Drop for CreateDir {
if let Some(path) = self.path.take() {
match self.state {
OpState::Running(op_index) => {
let result = self.sq.cancel_op(op_index, path, |submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
});
let result = self.sq.cancel_op(
op_index,
|| path,
|submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}");
}
Expand Down Expand Up @@ -909,14 +917,16 @@ impl Drop for Rename {

match self.state {
OpState::Running(op_index) => {
let result = self
.sq
.cancel_op(op_index, (from, to), |submission| unsafe {
let result = self.sq.cancel_op(
op_index,
|| Box::from((from, to)),
|submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
});
},
);
if let Err(err) = result {
log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}");
}
Expand Down Expand Up @@ -1016,12 +1026,16 @@ impl Drop for Delete {
if let Some(path) = self.path.take() {
match self.state {
OpState::Running(op_index) => {
let result = self.sq.cancel_op(op_index, path, |submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
});
let result = self.sq.cancel_op(
op_index,
|| path,
|submission| unsafe {
submission.cancel_op(op_index);
// We'll get a canceled completion event if we succeeded, which
// is sufficient to cleanup the operation.
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}");
}
Expand Down
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ op_future! {
/// access it safely.
buf: B,
},
drop_using: Box,
setup_state: offset: u64,
setup: |submission, fd, (buf,), offset| unsafe {
let (ptr, len) = buf.parts_mut();
Expand Down Expand Up @@ -479,6 +480,7 @@ op_future! {
/// to heap allocation.
iovecs: [libc::iovec; N],
},
drop_using: Box,
/// `iovecs` can't move until the kernel has read the submission.
impl !Unpin,
setup_state: offset: u64,
Expand Down Expand Up @@ -596,6 +598,7 @@ op_future! {
/// access it safely.
buf: B,
},
drop_using: Box,
setup_state: offset: u64,
setup: |submission, fd, (buf,), offset| unsafe {
let (ptr, len) = buf.parts();
Expand Down Expand Up @@ -723,6 +726,7 @@ op_future! {
/// to heap allocation.
iovecs: [libc::iovec; N],
},
drop_using: Box,
/// `iovecs` can't move until the kernel has read the submission.
impl !Unpin,
setup_state: offset: u64,
Expand Down
79 changes: 15 additions & 64 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ use std::{fmt, ptr};

mod bitmap;
mod config;
mod drop_waker;
mod op;
mod sys;

Expand All @@ -167,6 +168,7 @@ use bitmap::AtomicBitMap;
pub use cancel::Cancel;
use config::munmap;
pub use config::Config;
use drop_waker::{drop_task_waker, DropWake};
#[doc(no_inline)]
pub use extract::Extract;
#[doc(no_inline)]
Expand Down Expand Up @@ -805,24 +807,27 @@ impl SubmissionQueue {

/// Mark the operation with `op_index` as dropped, attempting to cancel it.
///
/// Because the kernel still has access to the `resources`, we might have to
/// do some trickery to delay the deallocation of `resources` and making the
/// Because the kernel still has access to the resources, we might have to
/// do some trickery to delay the deallocation of resources and making the
/// queued operation slot available again.
///
/// When the operation is still in progress we attempt to cancel it using
/// submission created by `cancel`. If the operation has completed it will
/// just drop `resources` and make the slot available again.
/// just drop resources (using `create_drop_waker`) and make the slot
/// available again.
///
/// # Notes
///
/// `cancel` should most likely use [`Submission::no_completion_event`]
pub(crate) fn cancel_op<T, F>(
pub(crate) fn cancel_op<R, D, F>(
&self,
op_index: OpIndex,
resources: T,
create_drop_waker: R,
cancel: F,
) -> Result<(), QueueFull>
where
R: FnOnce() -> D,
D: DropWake,
F: FnOnce(&mut Submission),
{
log::trace!(op_index = op_index.0; "canceling operation");
Expand All @@ -832,13 +837,13 @@ impl SubmissionQueue {
if op.no_more_events() {
// Easy path, the operation has already been completed.
*operation = None;
// Unlock defore dropping `resources`, which might take a
// Unlock defore dropping `create_drop_waker`, which might take a
// while.
drop(operation);
self.shared.op_indices.make_available(op_index.0);

// We can safely drop the resources.
drop(resources);
drop(create_drop_waker);
return Ok(());
}

Expand All @@ -853,10 +858,10 @@ impl SubmissionQueue {
// until the kernel is done with the operation.
//
// We achieve 1 by creating a special waker that just drops the
// resources in `resources`.
let waker = if needs_drop::<T>() {
// resources (created by `create_drop_waker`).
let waker = if needs_drop::<D>() {
// SAFETY: we're not going to clone the `waker`.
Some(unsafe { drop_task_waker(Box::from(resources)) })
Some(unsafe { drop_task_waker(create_drop_waker()) })
} else {
// Of course if we don't need to drop `T`, then we don't
// have to use a special waker. But we still don't want to
Expand Down Expand Up @@ -1013,60 +1018,6 @@ impl fmt::Display for QueueFull {
}
}

/// Create a [`task::Waker`] that will drop itself when the waker is dropped.
///
/// # Safety
///
/// The returned `task::Waker` cannot be cloned, it will panic.
unsafe fn drop_task_waker<T: DropWaker>(to_drop: T) -> task::Waker {
unsafe fn drop_by_ptr<T: DropWaker>(data: *const ()) {
T::drop_from_waker_data(data);
}

// SAFETY: we meet the `task::Waker` and `task::RawWaker` requirements.
unsafe {
task::Waker::from_raw(task::RawWaker::new(
to_drop.into_waker_data(),
&task::RawWakerVTable::new(
|_| panic!("attempted to clone `a10::drop_task_waker`"),
// SAFETY: `wake` takes ownership, so dropping is safe.
drop_by_ptr::<T>,
|_| { /* `wake_by_ref` is a no-op. */ },
drop_by_ptr::<T>,
),
))
}
}

/// Trait used by [`drop_task_waker`].
trait DropWaker {
/// Return itself as waker data.
fn into_waker_data(self) -> *const ();

/// Drop the waker `data` created by `into_waker_data`.
unsafe fn drop_from_waker_data(data: *const ());
}

impl<T> DropWaker for Box<T> {
fn into_waker_data(self) -> *const () {
Box::into_raw(self).cast()
}

unsafe fn drop_from_waker_data(data: *const ()) {
drop(Box::<T>::from_raw(data.cast_mut().cast()));
}
}

impl<T> DropWaker for Arc<T> {
fn into_waker_data(self) -> *const () {
Arc::into_raw(self).cast()
}

unsafe fn drop_from_waker_data(data: *const ()) {
drop(Arc::<T>::from_raw(data.cast_mut().cast()));
}
}

/// Queue of completion events.
#[derive(Debug)]
struct CompletionQueue {
Expand Down
Loading