Skip to content

Commit

Permalink
Prevent the busy loop of async schedulers
Browse files Browse the repository at this point in the history
Prevent the async runtime's schedulers from going into infinite busy loops when there
are pending `run_pending_tasks` calls on the `future::Cache`. This is done by
replacing a `RwLock` used in an internal `schedule_write_op` method with an event
notification.
  • Loading branch information
tatsuya6502 committed Apr 10, 2024
1 parent 6ba5445 commit 5996c87
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ default = ["atomic64", "quanta"]
sync = []

# Enable this feature to use `moka::future::Cache`.
future = ["async-lock", "async-trait", "futures-util"]
future = ["async-lock", "async-trait", "event-listener", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand Down Expand Up @@ -64,8 +64,9 @@ triomphe = { version = "0.1.3", default-features = false }
quanta = { version = "0.12.2", optional = true }

# Optional dependencies (future)
async-lock = { version = "2.4", optional = true }
async-lock = { version = "3.3", optional = true }
async-trait = { version = "0.1.58", optional = true }
event-listener = { version = "5.3", optional = true }
futures-util = { version = "0.3.17", optional = true }

# Optional dependencies (logging)
Expand Down
78 changes: 47 additions & 31 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl<K, V, S> BaseCache<K, V, S> {
}

#[inline]
pub(crate) fn maintenance_task_lock(&self) -> &RwLock<()> {
&self.inner.maintenance_task_lock
pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> {
&self.inner.write_op_ch_ready_event
}

pub(crate) fn notify_invalidate(
Expand Down Expand Up @@ -618,7 +618,7 @@ where
pub(crate) async fn schedule_write_op(
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
maintenance_task_lock: &RwLock<()>,
ch_ready_event: &event_listener::Event<()>,
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
Expand All @@ -638,13 +638,16 @@ where
let mut op = op;
let mut spin_loop_attempts = 0u8;
loop {
// Run the `Inner::do_run_pending_tasks` method if needed.
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
ch,
ts,
housekeeper,
)
.await;

// Try to send our op to the write op channel.
match ch.try_send(op) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
Expand All @@ -666,25 +669,22 @@ where
std::hint::spin_loop();
}
} else {
// Wait for a shared reader lock to become available. The exclusive
// writer lock will be already held by another async task that is
// currently calling `do_run_pending_tasks` method via
// `apply_reads_writes_if_needed` method above.
spin_loop_attempts = 0;

// Yield the async runtime scheduler to other async tasks and wait
// for a channel ready event. This event will be sent when one of the
// following conditions is met:
//
// `do_run_pending_tasks` will receive some of the ops from the
// channel and apply them to the data structures for the cache
// policies, so the channel will have some room for the new ops.
// - The `Inner::do_run_pending_tasks` method has removed some ops
// from the write op channel.
// - THe `Housekeeper`'s `run_pending_tasks` or `
// try_,run_pending_tasks` methods has freed the lock on the
// `current_task`.
//
// A shared lock will become available once the async task has
// returned from `do_run_pending_tasks`. We release the lock
// immediately after we acquire it.
let _ = maintenance_task_lock.read().await;
spin_loop_attempts = 0;
ch_ready_event.listen().await;

// We are going to retry. If the write op channel has enough room, we
// will be able to send our op to the channel and we are done. If
// not, we (or somebody else) will become the next exclusive writer
// when we (or somebody) call `apply_reads_writes_if_needed` above.
// We are going to retry. Now the channel may have some space and/or
// one of us is allowed to run `do_run_pending_tasks` method.
}
}
}
Expand Down Expand Up @@ -717,10 +717,10 @@ where

// Retry to schedule the write op.
let ts = cancel_guard.ts;
let lock = self.maintenance_task_lock();
let event = self.write_op_ch_ready_event();
let op = cancel_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, lock, op, ts, hk, false)
Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false)
.await
.expect("Failed to reschedule a write op");

Expand Down Expand Up @@ -1042,7 +1042,7 @@ pub(crate) struct Inner<K, V, S> {
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
maintenance_task_lock: RwLock<()>,
write_op_ch_ready_event: event_listener::Event,
eviction_policy: EvictionPolicyConfig,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
Expand Down Expand Up @@ -1250,7 +1250,7 @@ where
frequency_sketch_enabled: AtomicBool::default(),
read_op_ch,
write_op_ch,
maintenance_task_lock: RwLock::default(),
write_op_ch_ready_event: event_listener::Event::default(),
eviction_policy: eviction_policy.config,
expiration_policy,
valid_after: AtomicInstant::default(),
Expand Down Expand Up @@ -1412,6 +1412,15 @@ where
self.do_run_pending_tasks(max_repeats).await;
}

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self) {
let listeners = self.write_op_ch_ready_event.total_listeners();
// NOTE: The `notify` method accepts 0, so no need to check if `listeners` is
// greater than 0.
self.write_op_ch_ready_event.notify(listeners);
}

fn now(&self) -> Instant {
self.current_time_from_expiration_clock()
}
Expand All @@ -1429,10 +1438,6 @@ where
}

// Acquire some locks.

// SAFETY: the write lock below should never be starved, because the lock
// strategy of async_lock::RwLock is write-preferring.
let write_op_ch_lock = self.maintenance_task_lock.write().await;
let mut deqs = self.deques.lock().await;
let mut timer_wheel = self.timer_wheel.lock().await;

Expand Down Expand Up @@ -1462,9 +1467,21 @@ where
self.enable_frequency_sketch(&eviction_state.counters).await;
}

let w_len = self.write_op_ch.len();

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel to have enough room, notify them.
let listeners = self.write_op_ch_ready_event.total_listeners();
if listeners > 0 {
let n = listeners.min(WRITE_LOG_SIZE - w_len);
// Notify the `n` listeners. The `notify` method accepts 0, so no
// need to check if `n` is greater than 0.
self.write_op_ch_ready_event.notify(n);
}

calls += 1;
should_process_logs = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
|| self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT;
should_process_logs =
self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || w_len >= WRITE_LOG_FLUSH_POINT;
}

if timer_wheel.is_enabled() {
Expand Down Expand Up @@ -1527,9 +1544,8 @@ where

crossbeam_epoch::pin().flush();

// Ensure some of the locks are held until here.
// Ensure this lock is held until here.
drop(deqs);
drop(write_op_ch_lock);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,12 +1820,12 @@ where
}

let hk = self.base.housekeeper.as_ref();
let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
ts,
hk,
Expand Down Expand Up @@ -1986,13 +1986,13 @@ where
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
now,
hk,
Expand Down
27 changes: 23 additions & 4 deletions src/future/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use futures_util::future::{BoxFuture, Shared};
#[async_trait]
pub(crate) trait InnerSync {
async fn run_pending_tasks(&self, max_sync_repeats: usize);

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self);

fn now(&self) -> Instant;
}

Expand Down Expand Up @@ -75,19 +80,33 @@ impl Housekeeper {
T: InnerSync + Send + Sync + 'static,
{
let mut current_task = self.current_task.lock().await;
self.do_run_pending_tasks(cache, &mut current_task).await;
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;

drop(current_task);

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
}

pub(crate) async fn try_run_pending_tasks<T>(&self, cache: Arc<T>) -> bool
where
T: InnerSync + Send + Sync + 'static,
{
if let Some(mut current_task) = self.current_task.try_lock() {
self.do_run_pending_tasks(cache, &mut current_task).await;
true
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;
} else {
false
return false;
}

// The `current_task` lock should be free now.

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
true
}

async fn do_run_pending_tasks<T>(
Expand Down

0 comments on commit 5996c87

Please sign in to comment.