Skip to content

Commit

Permalink
Merge pull request #415 from moka-rs/avoid-async-scheduler-busy-loop
Browse files Browse the repository at this point in the history
Prevent pending `run_pending_tasks` of `future::Cache` from causing busy loop in `schedule_write_op`
  • Loading branch information
tatsuya6502 authored Apr 11, 2024
2 parents 6ba5445 + c50d186 commit 2f23e5c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 42 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Moka Cache — Change Log

## Version 0.12.6

### Fixed

- Fixed a bug in `future::Cache` that pending `run_pending_tasks` calls may cause
infinite busy loop in an internal `schedule_write_op` method
([#412][gh-issue-0412]):
- This bug was introduced in `v0.12.0` when the background threads were removed
from `future::Cache`.
- This bug can occur when `run_pending_task` method is called by user code while
cache is receiving a very high number of concurrent cache write operations.
(e.g. `insert`, `get_with`, `invalidate` etc.)
- When it occurs, the `schedule_write_op` method will be spinning in a busy loop
forever, causing high CPU usage and all other async tasks to be starved.

### Changed

- Upgraded `async-lock` crate used by `future::Cache` from `v2.4` to the latest
`v3.3`.


## Version 0.12.5

### Added
Expand Down Expand Up @@ -828,6 +849,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-Swatinem]: https://github.com/Swatinem
[gh-tinou98]: https://github.com/tinou98

[gh-issue-0412]: https://github.com/moka-rs/moka/issues/412/
[gh-issue-0385]: https://github.com/moka-rs/moka/issues/385/
[gh-issue-0329]: https://github.com/moka-rs/moka/issues/329/
[gh-issue-0322]: https://github.com/moka-rs/moka/issues/322/
Expand Down
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.5"
version = "0.12.6"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand All @@ -22,7 +22,7 @@ default = ["atomic64", "quanta"]
sync = []

# Enable this feature to use `moka::future::Cache`.
future = ["async-lock", "async-trait", "futures-util"]
future = ["async-lock", "async-trait", "event-listener", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand Down Expand Up @@ -64,8 +64,9 @@ triomphe = { version = "0.1.3", default-features = false }
quanta = { version = "0.12.2", optional = true }

# Optional dependencies (future)
async-lock = { version = "2.4", optional = true }
async-lock = { version = "3.3", optional = true }
async-trait = { version = "0.1.58", optional = true }
event-listener = { version = "5.3", optional = true }
futures-util = { version = "0.3.17", optional = true }

# Optional dependencies (logging)
Expand Down
75 changes: 44 additions & 31 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl<K, V, S> BaseCache<K, V, S> {
}

#[inline]
pub(crate) fn maintenance_task_lock(&self) -> &RwLock<()> {
&self.inner.maintenance_task_lock
pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> {
&self.inner.write_op_ch_ready_event
}

pub(crate) fn notify_invalidate(
Expand Down Expand Up @@ -618,7 +618,7 @@ where
pub(crate) async fn schedule_write_op(
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
maintenance_task_lock: &RwLock<()>,
ch_ready_event: &event_listener::Event<()>,
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
Expand All @@ -638,13 +638,16 @@ where
let mut op = op;
let mut spin_loop_attempts = 0u8;
loop {
// Run the `Inner::do_run_pending_tasks` method if needed.
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
ch,
ts,
housekeeper,
)
.await;

// Try to send our op to the write op channel.
match ch.try_send(op) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
Expand All @@ -666,25 +669,22 @@ where
std::hint::spin_loop();
}
} else {
// Wait for a shared reader lock to become available. The exclusive
// writer lock will be already held by another async task that is
// currently calling `do_run_pending_tasks` method via
// `apply_reads_writes_if_needed` method above.
spin_loop_attempts = 0;

// Yield the async runtime scheduler to other async tasks and wait
// for a channel ready event. This event will be sent when one of the
// following conditions is met:
//
// `do_run_pending_tasks` will receive some of the ops from the
// channel and apply them to the data structures for the cache
// policies, so the channel will have some room for the new ops.
// - The `Inner::do_run_pending_tasks` method has removed some ops
// from the write op channel.
// - The `Housekeeper`'s `run_pending_tasks` or `
// try_run_pending_tasks` methods has freed the lock on the
// `current_task`.
//
// A shared lock will become available once the async task has
// returned from `do_run_pending_tasks`. We release the lock
// immediately after we acquire it.
let _ = maintenance_task_lock.read().await;
spin_loop_attempts = 0;
ch_ready_event.listen().await;

// We are going to retry. If the write op channel has enough room, we
// will be able to send our op to the channel and we are done. If
// not, we (or somebody else) will become the next exclusive writer
// when we (or somebody) call `apply_reads_writes_if_needed` above.
// We are going to retry. Now the channel may have some space and/or
// one of us is allowed to run `do_run_pending_tasks` method.
}
}
}
Expand Down Expand Up @@ -717,10 +717,10 @@ where

// Retry to schedule the write op.
let ts = cancel_guard.ts;
let lock = self.maintenance_task_lock();
let event = self.write_op_ch_ready_event();
let op = cancel_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, lock, op, ts, hk, false)
Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false)
.await
.expect("Failed to reschedule a write op");

Expand Down Expand Up @@ -1042,7 +1042,7 @@ pub(crate) struct Inner<K, V, S> {
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
maintenance_task_lock: RwLock<()>,
write_op_ch_ready_event: event_listener::Event,
eviction_policy: EvictionPolicyConfig,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
Expand Down Expand Up @@ -1250,7 +1250,7 @@ where
frequency_sketch_enabled: AtomicBool::default(),
read_op_ch,
write_op_ch,
maintenance_task_lock: RwLock::default(),
write_op_ch_ready_event: event_listener::Event::default(),
eviction_policy: eviction_policy.config,
expiration_policy,
valid_after: AtomicInstant::default(),
Expand Down Expand Up @@ -1412,6 +1412,12 @@ where
self.do_run_pending_tasks(max_repeats).await;
}

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self) {
self.write_op_ch_ready_event.notify(usize::MAX);
}

fn now(&self) -> Instant {
self.current_time_from_expiration_clock()
}
Expand All @@ -1429,10 +1435,6 @@ where
}

// Acquire some locks.

// SAFETY: the write lock below should never be starved, because the lock
// strategy of async_lock::RwLock is write-preferring.
let write_op_ch_lock = self.maintenance_task_lock.write().await;
let mut deqs = self.deques.lock().await;
let mut timer_wheel = self.timer_wheel.lock().await;

Expand Down Expand Up @@ -1462,9 +1464,21 @@ where
self.enable_frequency_sketch(&eviction_state.counters).await;
}

let w_len = self.write_op_ch.len();

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel to have enough room, notify them.
let listeners = self.write_op_ch_ready_event.total_listeners();
if listeners > 0 {
let n = listeners.min(WRITE_LOG_SIZE - w_len);
// Notify the `n` listeners. The `notify` method accepts 0, so no
// need to check if `n` is greater than 0.
self.write_op_ch_ready_event.notify(n);
}

calls += 1;
should_process_logs = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
|| self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT;
should_process_logs =
self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || w_len >= WRITE_LOG_FLUSH_POINT;
}

if timer_wheel.is_enabled() {
Expand Down Expand Up @@ -1527,9 +1541,8 @@ where

crossbeam_epoch::pin().flush();

// Ensure some of the locks are held until here.
// Ensure this lock is held until here.
drop(deqs);
drop(write_op_ch_lock);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,12 +1820,12 @@ where
}

let hk = self.base.housekeeper.as_ref();
let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
ts,
hk,
Expand Down Expand Up @@ -1986,13 +1986,13 @@ where
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
now,
hk,
Expand Down
27 changes: 23 additions & 4 deletions src/future/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use futures_util::future::{BoxFuture, Shared};
#[async_trait]
pub(crate) trait InnerSync {
async fn run_pending_tasks(&self, max_sync_repeats: usize);

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self);

fn now(&self) -> Instant;
}

Expand Down Expand Up @@ -75,19 +80,33 @@ impl Housekeeper {
T: InnerSync + Send + Sync + 'static,
{
let mut current_task = self.current_task.lock().await;
self.do_run_pending_tasks(cache, &mut current_task).await;
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;

drop(current_task);

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
}

pub(crate) async fn try_run_pending_tasks<T>(&self, cache: Arc<T>) -> bool
where
T: InnerSync + Send + Sync + 'static,
{
if let Some(mut current_task) = self.current_task.try_lock() {
self.do_run_pending_tasks(cache, &mut current_task).await;
true
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;
} else {
false
return false;
}

// The `current_task` lock should be free now.

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
true
}

async fn do_run_pending_tasks<T>(
Expand Down

0 comments on commit 2f23e5c

Please sign in to comment.