Skip to content

Commit

Permalink
Replace crossbeam-channel with async-channel for future::Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya6502 committed Sep 6, 2023
1 parent 4fd6171 commit 1c49619
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
23 changes: 13 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ build = "build.rs"
default = ["atomic64", "quanta"]

# Enable this feature to use `moka::sync::{Cache, SegmentedCache}`
sync = []
sync = ["crossbeam-channel"]

# Enable this feature to use `moka::future::Cache`.
future = ["async-lock", "async-trait", "futures-util"]
future = ["async-channel", "async-lock", "async-trait", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand All @@ -46,14 +46,13 @@ js = ["uuid/js"]
unstable-debug-counters = ["future"]

[dependencies]
crossbeam-channel = { version = "0.5.5" }
crossbeam-epoch = { version = "0.9.9" }
crossbeam-utils = { version = "0.8" }
once_cell = { version = "1.7" }
parking_lot = { version = "0.12" }
smallvec = { version = "1.8" }
tagptr = { version = "0.2" }
thiserror = { version = "1.0" }
crossbeam-epoch = "0.9.9"
crossbeam-utils = "0.8"
once_cell = "1.7"
parking_lot = "0.12"
smallvec = "1.8"
tagptr = "0.2"
thiserror = "1.0"
uuid = { version = "1.1", features = ["v4"] }

# Opt-out serde and stable_deref_trait features
Expand All @@ -63,7 +62,11 @@ triomphe = { version = "0.1.3", default-features = false }
# Optional dependencies (enabled by default)
quanta = { version = "0.11.0", optional = true }

# Optional dependencies (sync)
crossbeam-channel = { version = "0.5.5", optional = true}

# Optional dependencies (future)
async-channel = { version = "1.9", optional = true }
async-lock = { version = "2.4", optional = true }
async-trait = { version = "0.1.58", optional = true }
futures-util = { version = "0.3.17", optional = true }
Expand Down
17 changes: 2 additions & 15 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
//!
//! To use this module, enable a crate feature called "future".
use async_lock::Mutex;
use crossbeam_channel::Sender;
use async_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};

use crate::common::{concurrent::WriteOp, time::Instant};
Expand Down Expand Up @@ -143,18 +141,7 @@ impl<'a, K, V> Drop for CancelGuard<'a, K, V> {
};

self.interrupted_op_ch
.send(interrupted_op)
.send_blocking(interrupted_op)
.expect("Failed to send a pending op");
}
}

/// May yield to other async tasks.
pub(crate) async fn may_yield() {
static LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);

// Acquire the lock then immediately release it. This `await` may yield to other
// tasks.
//
// NOTE: This behavior was tested with Tokio and async-std.
let _ = LOCK.lock().await;
}
58 changes: 31 additions & 27 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use crate::{
#[cfg(feature = "unstable-debug-counters")]
use common::concurrent::debug_counters::CacheDebugStats;

use async_channel::{Receiver, SendError, Sender, TrySendError};
use async_lock::{Mutex, MutexGuard, RwLock};
use async_trait::async_trait;
use crossbeam_channel::{Receiver, Sender, TrySendError};
use crossbeam_utils::atomic::AtomicCell;
use futures_util::future::BoxFuture;
use parking_lot::RwLock as SyncRwLock;
Expand Down Expand Up @@ -167,14 +167,15 @@ where
invalidator_enabled: bool,
) -> Self {
let (r_size, w_size) = if max_capacity == Some(0) {
(0, 0)
// async_channel::bounded(0) will panic. Use 1 instead.
(1, 1)
} else {
(READ_LOG_SIZE, WRITE_LOG_SIZE)
};

let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
let (i_snd, i_rcv) = crossbeam_channel::unbounded();
let (r_snd, r_rcv) = async_channel::bounded(r_size);
let (w_snd, w_rcv) = async_channel::bounded(w_size);
let (i_snd, i_rcv) = async_channel::unbounded();

let inner = Arc::new(Inner::new(
name,
Expand Down Expand Up @@ -461,7 +462,7 @@ where
match ch.try_send(op) {
// Discard the ReadOp when the channel is full.
Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
Err(e @ TrySendError::Disconnected(_)) => Err(e),
Err(e @ TrySendError::Closed(_)) => Err(e),
}
}

Expand Down Expand Up @@ -633,41 +634,44 @@ where
mutex.lock().await;
}

let mut op = op;
let mut spin_count = 0u8;
loop {
const MAX_SPINS_BEFORE_BLOCKING: u8 = 10;
let mut op: WriteOp<K, V> = op;

// Try to send the op with some quick retries.
for _ in 0..MAX_SPINS_BEFORE_BLOCKING {
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
ch,
ts,
housekeeper,
)
.await;

match ch.try_send(op) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
op = op1;
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}

// We have got a `TrySendError::Full` above. Wait for a bit and try
// again.
if spin_count < 10 {
spin_count += 1;
// Wastes some CPU time with a hint to indicate to the CPU that we
// are spinning
for _ in 0..8 {
std::hint::spin_loop();
// The channel is full. Let's wastes some CPU time with a hint to
// indicate to the CPU that we are spinning.
//
// NOTE: Recent x86_64 processors should have much longer latency
// for the `pause` instruction than the equivalent instructions
// in other architectures. We might want to adjust the number of
// iterations here depending on the target architecture.
for _ in 0..8 {
std::hint::spin_loop();
}
}
} else {
spin_count = 0;
// Try to yield to other tasks. We have to yield sometimes, otherwise
// other task, which is draining the `ch`, will not make any
// progress. If this happens, we will stuck in this loop forever.
super::may_yield().await;
Err(e @ TrySendError::Closed(_)) => return Err(e),
}
}

// Still cannot send the op. Calling `send` may cause us to yield to other
// async tasks if the channel is still full.
ch.send(op)
.await
.map_err(|SendError(op)| TrySendError::Closed(op))
}

pub(crate) async fn retry_interrupted_ops(&self) {
Expand Down Expand Up @@ -2731,7 +2735,7 @@ mod tests {

async fn insert(cache: &BaseCache<Key, Value>, key: Key, hash: u64, value: Value) {
let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, value).await;
cache.write_op_ch.send(op).expect("Failed to send");
cache.write_op_ch.send(op).await.expect("Failed to send");
}

fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
Expand Down

0 comments on commit 1c49619

Please sign in to comment.