diff --git a/Cargo.toml b/Cargo.toml index 9d283d13..bf2694a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ default = ["atomic64", "quanta"] sync = [] # Enable this feature to use `moka::future::Cache`. -future = ["async-lock", "async-trait", "futures-util"] +future = ["async-lock", "async-trait", "event-listener", "futures-util"] # Enable this feature to activate optional logging from caches. # Currently cache will emit log only when it encounters a panic in user provided @@ -64,8 +64,9 @@ triomphe = { version = "0.1.3", default-features = false } quanta = { version = "0.12.2", optional = true } # Optional dependencies (future) -async-lock = { version = "2.4", optional = true } +async-lock = { version = "3.3", optional = true } async-trait = { version = "0.1.58", optional = true } +event-listener = { version = "5.3", optional = true } futures-util = { version = "0.3.17", optional = true } # Optional dependencies (logging) diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 5450b5b3..3712ce0b 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -121,8 +121,8 @@ impl BaseCache { } #[inline] - pub(crate) fn maintenance_task_lock(&self) -> &RwLock<()> { - &self.inner.maintenance_task_lock + pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> { + &self.inner.write_op_ch_ready_event } pub(crate) fn notify_invalidate( @@ -618,7 +618,7 @@ where pub(crate) async fn schedule_write_op( inner: &Arc, ch: &Sender>, - maintenance_task_lock: &RwLock<()>, + ch_ready_event: &event_listener::Event<()>, op: WriteOp, ts: Instant, housekeeper: Option<&HouseKeeperArc>, @@ -638,6 +638,7 @@ where let mut op = op; let mut spin_loop_attempts = 0u8; loop { + // Run the `Inner::do_run_pending_tasks` method if needed. BaseCache::::apply_reads_writes_if_needed( Arc::clone(inner), ch, @@ -645,6 +646,8 @@ where housekeeper, ) .await; + + // Try to send our op to the write op channel. match ch.try_send(op) { Ok(()) => return Ok(()), Err(TrySendError::Full(op1)) => { @@ -666,25 +669,22 @@ where std::hint::spin_loop(); } } else { - // Wait for a shared reader lock to become available. The exclusive - // writer lock will be already held by another async task that is - // currently calling `do_run_pending_tasks` method via - // `apply_reads_writes_if_needed` method above. + spin_loop_attempts = 0; + + // Yield the async runtime scheduler to other async tasks and wait + // for a channel ready event. This event will be sent when one of the + // following conditions is met: // - // `do_run_pending_tasks` will receive some of the ops from the - // channel and apply them to the data structures for the cache - // policies, so the channel will have some room for the new ops. + // - The `Inner::do_run_pending_tasks` method has removed some ops + // from the write op channel. + // - THe `Housekeeper`'s `run_pending_tasks` or ` + // try_,run_pending_tasks` methods has freed the lock on the + // `current_task`. // - // A shared lock will become available once the async task has - // returned from `do_run_pending_tasks`. We release the lock - // immediately after we acquire it. - let _ = maintenance_task_lock.read().await; - spin_loop_attempts = 0; + ch_ready_event.listen().await; - // We are going to retry. If the write op channel has enough room, we - // will be able to send our op to the channel and we are done. If - // not, we (or somebody else) will become the next exclusive writer - // when we (or somebody) call `apply_reads_writes_if_needed` above. + // We are going to retry. Now the channel may have some space and/or + // one of us is allowed to run `do_run_pending_tasks` method. } } } @@ -717,10 +717,10 @@ where // Retry to schedule the write op. let ts = cancel_guard.ts; - let lock = self.maintenance_task_lock(); + let event = self.write_op_ch_ready_event(); let op = cancel_guard.op.as_ref().cloned().unwrap(); let hk = self.housekeeper.as_ref(); - Self::schedule_write_op(&self.inner, &self.write_op_ch, lock, op, ts, hk, false) + Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false) .await .expect("Failed to reschedule a write op"); @@ -1042,7 +1042,7 @@ pub(crate) struct Inner { frequency_sketch_enabled: AtomicBool, read_op_ch: Receiver>, write_op_ch: Receiver>, - maintenance_task_lock: RwLock<()>, + write_op_ch_ready_event: event_listener::Event, eviction_policy: EvictionPolicyConfig, expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, @@ -1250,7 +1250,7 @@ where frequency_sketch_enabled: AtomicBool::default(), read_op_ch, write_op_ch, - maintenance_task_lock: RwLock::default(), + write_op_ch_ready_event: event_listener::Event::default(), eviction_policy: eviction_policy.config, expiration_policy, valid_after: AtomicInstant::default(), @@ -1412,6 +1412,15 @@ where self.do_run_pending_tasks(max_repeats).await; } + /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method + /// for the write op channel to have enough room. + fn notify_write_op_ch_is_ready(&self) { + let listeners = self.write_op_ch_ready_event.total_listeners(); + // NOTE: The `notify` method accepts 0, so no need to check if `listeners` is + // greater than 0. + self.write_op_ch_ready_event.notify(listeners); + } + fn now(&self) -> Instant { self.current_time_from_expiration_clock() } @@ -1429,10 +1438,6 @@ where } // Acquire some locks. - - // SAFETY: the write lock below should never be starved, because the lock - // strategy of async_lock::RwLock is write-preferring. - let write_op_ch_lock = self.maintenance_task_lock.write().await; let mut deqs = self.deques.lock().await; let mut timer_wheel = self.timer_wheel.lock().await; @@ -1462,9 +1467,21 @@ where self.enable_frequency_sketch(&eviction_state.counters).await; } + let w_len = self.write_op_ch.len(); + + // If there are any async tasks waiting in `BaseCache::schedule_write_op` + // method for the write op channel to have enough room, notify them. + let listeners = self.write_op_ch_ready_event.total_listeners(); + if listeners > 0 { + let n = listeners.min(WRITE_LOG_SIZE - w_len); + // Notify the `n` listeners. The `notify` method accepts 0, so no + // need to check if `n` is greater than 0. + self.write_op_ch_ready_event.notify(n); + } + calls += 1; - should_process_logs = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT - || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT; + should_process_logs = + self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || w_len >= WRITE_LOG_FLUSH_POINT; } if timer_wheel.is_enabled() { @@ -1527,9 +1544,8 @@ where crossbeam_epoch::pin().flush(); - // Ensure some of the locks are held until here. + // Ensure this lock is held until here. drop(deqs); - drop(write_op_ch_lock); } } diff --git a/src/future/cache.rs b/src/future/cache.rs index fd2bc035..40112f88 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1820,12 +1820,12 @@ where } let hk = self.base.housekeeper.as_ref(); - let lock = self.base.maintenance_task_lock(); + let event = self.base.write_op_ch_ready_event(); BaseCache::::schedule_write_op( &self.base.inner, &self.base.write_op_ch, - lock, + event, op, ts, hk, @@ -1986,13 +1986,13 @@ where should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); } - let lock = self.base.maintenance_task_lock(); + let event = self.base.write_op_ch_ready_event(); let hk = self.base.housekeeper.as_ref(); BaseCache::::schedule_write_op( &self.base.inner, &self.base.write_op_ch, - lock, + event, op, now, hk, diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index eb32f0ba..60fc9111 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -27,6 +27,11 @@ use futures_util::future::{BoxFuture, Shared}; #[async_trait] pub(crate) trait InnerSync { async fn run_pending_tasks(&self, max_sync_repeats: usize); + + /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method + /// for the write op channel to have enough room. + fn notify_write_op_ch_is_ready(&self); + fn now(&self) -> Instant; } @@ -75,7 +80,14 @@ impl Housekeeper { T: InnerSync + Send + Sync + 'static, { let mut current_task = self.current_task.lock().await; - self.do_run_pending_tasks(cache, &mut current_task).await; + self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task) + .await; + + drop(current_task); + + // If there are any async tasks waiting in `BaseCache::schedule_write_op` + // method for the write op channel, notify them. + cache.notify_write_op_ch_is_ready(); } pub(crate) async fn try_run_pending_tasks(&self, cache: Arc) -> bool @@ -83,11 +95,18 @@ impl Housekeeper { T: InnerSync + Send + Sync + 'static, { if let Some(mut current_task) = self.current_task.try_lock() { - self.do_run_pending_tasks(cache, &mut current_task).await; - true + self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task) + .await; } else { - false + return false; } + + // The `current_task` lock should be free now. + + // If there are any async tasks waiting in `BaseCache::schedule_write_op` + // method for the write op channel, notify them. + cache.notify_write_op_ch_is_ready(); + true } async fn do_run_pending_tasks(