Skip to content

Commit

Permalink
Merge pull request #323 from moka-rs/better-async-yielding
Browse files Browse the repository at this point in the history
Better async waiting for when the write op channel is full
  • Loading branch information
tatsuya6502 authored Sep 9, 2023
2 parents a03488b + 965f198 commit eddec37
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 42 deletions.
26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.0-beta.2"
version = "0.12.0-beta.3"
edition = "2018"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down Expand Up @@ -35,25 +35,25 @@ logging = ["log"]
# https://github.com/moka-rs/moka#resolving-compile-errors-on-some-32-bit-platforms
atomic64 = []

# This is an **experimental** feature to make `unsync` and `sync` caches to compile
# for `wasm32-unknown-unknown` target. Note that we have not tested if these caches
# work correctly in wasm32 environment.
# This is an **experimental** feature to make `sync` caches to compile for
# `wasm32-unknown-unknown` target. Note that we have not tested if these caches work
# correctly in wasm32 environment.
js = ["uuid/js"]

# This unstable feature adds `GlobalDebugCounters::current` function, which returns
# counters of internal object construction and destruction. It will have some
# performance impacts and is intended for debugging purpose.
# performance impacts and is intended for debugging.
unstable-debug-counters = ["future"]

[dependencies]
crossbeam-channel = { version = "0.5.5" }
crossbeam-epoch = { version = "0.9.9" }
crossbeam-utils = { version = "0.8" }
once_cell = { version = "1.7" }
parking_lot = { version = "0.12" }
smallvec = { version = "1.8" }
tagptr = { version = "0.2" }
thiserror = { version = "1.0" }
crossbeam-channel = "0.5.5"
crossbeam-epoch = "0.9.9"
crossbeam-utils = "0.8"
once_cell = "1.7"
parking_lot = "0.12"
smallvec = "1.8"
tagptr = "0.2"
thiserror = "1.0"
uuid = { version = "1.1", features = ["v4"] }

# Opt-out serde and stable_deref_trait features
Expand Down
13 changes: 0 additions & 13 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
//!
//! To use this module, enable a crate feature called "future".
use async_lock::Mutex;
use crossbeam_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};

use crate::common::{concurrent::WriteOp, time::Instant};
Expand Down Expand Up @@ -147,14 +145,3 @@ impl<'a, K, V> Drop for CancelGuard<'a, K, V> {
.expect("Failed to send a pending op");
}
}

/// May yield to other async tasks.
pub(crate) async fn may_yield() {
static LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);

// Acquire the lock then immediately release it. This `await` may yield to other
// tasks.
//
// NOTE: This behavior was tested with Tokio and async-std.
let _ = LOCK.lock().await;
}
62 changes: 49 additions & 13 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl<K, V, S> BaseCache<K, V, S> {
self.inner.current_time_from_expiration_clock()
}

#[inline]
pub(crate) fn maintenance_task_lock(&self) -> &RwLock<()> {
&self.inner.maintenance_task_lock
}

pub(crate) fn notify_invalidate(
&self,
key: &Arc<K>,
Expand Down Expand Up @@ -617,6 +622,7 @@ where
pub(crate) async fn schedule_write_op(
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
maintenance_task_lock: &RwLock<()>,
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
Expand All @@ -634,7 +640,7 @@ where
}

let mut op = op;
let mut spin_count = 0u8;
let mut spin_loop_attempts = 0u8;
loop {
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
Expand All @@ -651,21 +657,38 @@ where
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}

// We have got a `TrySendError::Full` above. Wait for a bit and try
// again.
if spin_count < 10 {
spin_count += 1;
// We have got a `TrySendError::Full` above. Wait a moment and try again.

if spin_loop_attempts < 4 {
spin_loop_attempts += 1;
// Wastes some CPU time with a hint to indicate to the CPU that we
// are spinning
for _ in 0..8 {
// are spinning. Adjust the SPIN_COUNT because the `PAUSE`
// instruction of recent x86_64 CPUs may have longer latency than the
// alternatives in other CPU architectures.
const SPIN_COUNT: usize = if cfg!(target_arch = "x86_64") { 8 } else { 32 };
for _ in 0..SPIN_COUNT {
std::hint::spin_loop();
}
} else {
spin_count = 0;
// Try to yield to other tasks. We have to yield sometimes, otherwise
// other task, which is draining the `ch`, will not make any
// progress. If this happens, we will stuck in this loop forever.
super::may_yield().await;
// Wait for a shared reader lock to become available. The exclusive
// writer lock will be already held by another async task that is
// currently calling `do_run_pending_tasks` method via
// `apply_reads_writes_if_needed` method above.
//
// `do_run_pending_tasks` will receive some of the ops from the
// channel and apply them to the data structures for the cache
// policies, so the channel will have some room for the new ops.
//
// A shared lock will become available once the async task has
// returned from `do_run_pending_tasks`. We release the lock
// immediately after we acquire it.
let _ = maintenance_task_lock.read().await;
spin_loop_attempts = 0;

// We are going to retry. If the write op channel has enough room, we
// will be able to send our op to the channel and we are done. If
// not, we (or somebody else) will become the next exclusive writer
// when we (or somebody) call `apply_reads_writes_if_needed` above.
}
}
}
Expand Down Expand Up @@ -698,9 +721,10 @@ where

// Retry to schedule the write op.
let ts = cancel_guard.ts;
let lock = self.maintenance_task_lock();
let op = cancel_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk, false)
Self::schedule_write_op(&self.inner, &self.write_op_ch, lock, op, ts, hk, false)
.await
.expect("Failed to reschedule a write op");

Expand Down Expand Up @@ -1012,6 +1036,7 @@ pub(crate) struct Inner<K, V, S> {
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
maintenance_task_lock: RwLock<()>,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
weigher: Option<Weigher<K, V>>,
Expand Down Expand Up @@ -1206,6 +1231,7 @@ where
frequency_sketch_enabled: Default::default(),
read_op_ch,
write_op_ch,
maintenance_task_lock: Default::default(),
expiration_policy,
valid_after: Default::default(),
weigher,
Expand Down Expand Up @@ -1379,8 +1405,14 @@ where
return;
}

// Acquire some locks.

// SAFETY: the write lock below should never be starved, because the lock
// strategy of async_lock::RwLock is write-preferring.
let write_op_ch_lock = self.maintenance_task_lock.write().await;
let mut deqs = self.deques.lock().await;
let mut timer_wheel = self.timer_wheel.lock().await;

let mut calls = 0;
let current_ec = self.entry_count.load();
let current_ws = self.weighted_size.load();
Expand Down Expand Up @@ -1462,6 +1494,10 @@ where
.store(eviction_state.counters.weighted_size);

crossbeam_epoch::pin().flush();

// Ensure some of the locks are held until here.
drop(deqs);
drop(write_op_ch_lock);
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,8 +1446,6 @@ where
std::mem::drop(klg);
std::mem::drop(kl);

let hk = self.base.housekeeper.as_ref();

let should_block;
#[cfg(not(test))]
{
Expand All @@ -1458,9 +1456,13 @@ where
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
now,
hk,
Expand Down Expand Up @@ -1914,7 +1916,6 @@ where
}

let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
let hk = self.base.housekeeper.as_ref();
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
cancel_guard.set_op(op.clone());

Expand All @@ -1928,9 +1929,13 @@ where
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let hk = self.base.housekeeper.as_ref();
let lock = self.base.maintenance_task_lock();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
ts,
hk,
Expand Down

0 comments on commit eddec37

Please sign in to comment.