Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
yamadapc committed Jan 17, 2024
2 parents cda6ba4 + 607d8ca commit bd7d0c0
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions crates/augmented/data/atomic-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
//! multiple producers while `ringbuf` is single producer single consumer.
//!
//! Testing again on a M1 Pro, it is 30% faster.
use std::cell::UnsafeCell;
use std::cmp::max;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicI8, AtomicUsize, Ordering};
Expand Down Expand Up @@ -84,7 +85,7 @@ impl From<CellState> for i8 {
pub struct Queue<T> {
head: AtomicUsize,
tail: AtomicUsize,
elements: Vec<MaybeUninit<T>>,
elements: Vec<UnsafeCell<MaybeUninit<T>>>,
states: Vec<AtomicI8>,
}

Expand All @@ -103,7 +104,7 @@ impl<T> Queue<T> {
pub fn new(capacity: usize) -> Self {
let mut elements = Vec::with_capacity(capacity);
for _ in 0..capacity {
elements.push(MaybeUninit::uninit());
elements.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let mut states = Vec::with_capacity(capacity);
for _ in 0..capacity {
Expand Down Expand Up @@ -221,17 +222,16 @@ impl<T> Queue<T> {
)
.is_ok()
{
let self_ptr = self as *const Self as *mut Self;
let element = unsafe {
std::mem::replace(
&mut (*self_ptr).elements[tail % self.elements.len()],
MaybeUninit::uninit(),
)
self.elements[tail % self.elements.len()]
.get()
.replace(MaybeUninit::uninit())
.assume_init()
};

state.store(CellState::Empty.into(), Ordering::Release);

return unsafe { element.assume_init() };
return element;
}
}
}
Expand All @@ -254,10 +254,11 @@ impl<T> Queue<T> {
.is_ok()
{
unsafe {
let self_ptr = self as *const Self as *mut Self;
// There's a potential small % optimisation from removing bounds checking here &
// using mem::replace.
(*self_ptr).elements[head % self.elements.len()] = MaybeUninit::new(element);
self.elements[head % self.elements.len()]
.get()
.write(MaybeUninit::new(element));
}
state.store(CellState::Stored.into(), Ordering::Release);
return;
Expand All @@ -266,6 +267,18 @@ impl<T> Queue<T> {
}
}

impl<T> Drop for Queue<T> {
fn drop(&mut self) {
if std::mem::needs_drop::<T>() {
// Could probably be made more efficient by using [std::ptr::drop_in_place()]
// as the &mut self here guarantees that we are the only remaining user of this Queue
while let Some(element) = self.pop() {
drop(element);
}
}
}
}

#[cfg(test)]
mod test {
use std::ffi::c_void;
Expand Down

0 comments on commit bd7d0c0

Please sign in to comment.