From c6a4d4b5c226518687b4245481046691c4e74894 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Mon, 15 Apr 2024 22:49:36 +0800 Subject: [PATCH] Make a single call to `run_pending_tasks` to evict as many entries as possible from the cache Ensure the loop in the `do_run_pending_tasks` method is eventually stopped. --- src/common.rs | 13 ++- src/common/concurrent/constants.rs | 2 +- src/common/concurrent/housekeeper.rs | 11 +- src/future/base_cache.rs | 158 +++++++++++++++++---------- src/future/cache.rs | 8 +- src/future/housekeeper.rs | 17 +-- src/sync/cache.rs | 8 +- src/sync_base/base_cache.rs | 102 ++++++++++++----- 8 files changed, 208 insertions(+), 111 deletions(-) diff --git a/src/common.rs b/src/common.rs index 9748e86f..8657693c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -76,10 +76,10 @@ pub(crate) struct HousekeeperConfig { pub(crate) maintenance_task_timeout: Duration, /// The maximum repeat count for receiving operation logs from the read and write /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. - pub(crate) max_log_sync_repeats: usize, + pub(crate) max_log_sync_repeats: u32, /// The batch size of entries to be processed by each internal eviction method. /// Default: `EVICTION_BATCH_SIZE`. - pub(crate) eviction_batch_size: usize, + pub(crate) eviction_batch_size: u32, } impl Default for HousekeeperConfig { @@ -88,7 +88,7 @@ impl Default for HousekeeperConfig { maintenance_task_timeout: Duration::from_millis( DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, ), - max_log_sync_repeats: DEFAULT_MAX_LOG_SYNC_REPEATS, + max_log_sync_repeats: DEFAULT_MAX_LOG_SYNC_REPEATS as u32, eviction_batch_size: DEFAULT_EVICTION_BATCH_SIZE, } } @@ -98,14 +98,15 @@ impl HousekeeperConfig { #[cfg(test)] pub(crate) fn new( maintenance_task_timeout: Option, - max_log_sync_repeats: Option, - eviction_batch_size: Option, + max_log_sync_repeats: Option, + eviction_batch_size: Option, ) -> Self { Self { maintenance_task_timeout: maintenance_task_timeout.unwrap_or(Duration::from_millis( DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, )), - max_log_sync_repeats: max_log_sync_repeats.unwrap_or(DEFAULT_MAX_LOG_SYNC_REPEATS), + max_log_sync_repeats: max_log_sync_repeats + .unwrap_or(DEFAULT_MAX_LOG_SYNC_REPEATS as u32), eviction_batch_size: eviction_batch_size.unwrap_or(DEFAULT_EVICTION_BATCH_SIZE), } } diff --git a/src/common/concurrent/constants.rs b/src/common/concurrent/constants.rs index cc3b094f..629c8d6d 100644 --- a/src/common/concurrent/constants.rs +++ b/src/common/concurrent/constants.rs @@ -14,7 +14,7 @@ pub(crate) const WRITE_LOG_CH_SIZE: usize = // TODO: Calculate the batch size based on the number of entries in the cache (or an // estimated number of entries to evict) -pub(crate) const DEFAULT_EVICTION_BATCH_SIZE: usize = WRITE_LOG_CH_SIZE; +pub(crate) const DEFAULT_EVICTION_BATCH_SIZE: u32 = WRITE_LOG_CH_SIZE as u32; /// The default timeout duration for the `run_pending_tasks` method. pub(crate) const DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS: u64 = 100; diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 706e34ad..11df87f4 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -14,12 +14,13 @@ use std::{ }; pub(crate) trait InnerSync { - /// Runs the pending tasks. Returns `true` if there are more entries to evict. + /// Runs the pending tasks. Returns `true` if there are more entries to evict in + /// next run. fn run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool; fn now(&self) -> Instant; @@ -43,10 +44,10 @@ pub(crate) struct Housekeeper { maintenance_task_timeout: Option, /// The maximum repeat count for receiving operation logs from the read and write /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. - max_log_sync_repeats: usize, + max_log_sync_repeats: u32, /// The batch size of entries to be processed by each internal eviction method. /// Default: `EVICTION_BATCH_SIZE`. - eviction_batch_size: usize, + eviction_batch_size: u32, auto_run_enabled: AtomicBool, } diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 57e8d5d6..4881cd2b 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -384,7 +384,7 @@ where #[inline] pub(crate) async fn apply_reads_writes_if_needed( - inner: Arc, + inner: &Arc, ch: &Sender>, now: Instant, housekeeper: Option<&HouseKeeperArc>, @@ -466,8 +466,7 @@ where op: ReadOp, now: Instant, ) -> Result<(), TrySendError>> { - self.apply_reads_if_needed(Arc::clone(&self.inner), now) - .await; + self.apply_reads_if_needed(&self.inner, now).await; let ch = &self.read_op_ch; match ch.try_send(op) { // Discard the ReadOp when the channel is full. @@ -644,13 +643,7 @@ where let mut spin_loop_attempts = 0u8; loop { // Run the `Inner::do_run_pending_tasks` method if needed. - BaseCache::::apply_reads_writes_if_needed( - Arc::clone(inner), - ch, - ts, - housekeeper, - ) - .await; + BaseCache::::apply_reads_writes_if_needed(inner, ch, ts, housekeeper).await; // Try to send our op to the write op channel. match ch.try_send(op) { @@ -736,7 +729,7 @@ where } #[inline] - async fn apply_reads_if_needed(&self, inner: Arc>, now: Instant) { + async fn apply_reads_if_needed(&self, inner: &Arc>, now: Instant) { let len = self.read_op_ch.len(); if let Some(hk) = &self.housekeeper { @@ -926,6 +919,7 @@ impl<'a, K, V> EvictionState<'a, K, V> { struct EvictionCounters { entry_count: u64, weighted_size: u64, + eviction_count: u64, } impl EvictionCounters { @@ -934,6 +928,7 @@ impl EvictionCounters { Self { entry_count, weighted_size, + eviction_count: 0, } } @@ -950,6 +945,12 @@ impl EvictionCounters { let total = &mut self.weighted_size; *total = total.saturating_sub(weight as u64); } + + #[inline] + fn incr_eviction_count(&mut self) { + let count = &mut self.eviction_count; + *count = count.saturating_add(1); + } } #[derive(Default)] @@ -1404,8 +1405,8 @@ where async fn run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool { self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) .await @@ -1432,8 +1433,8 @@ where async fn do_run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool { if self.max_capacity == Some(0) { return false; @@ -1448,46 +1449,50 @@ where } else { None }; - let mut calls = 0; + let mut should_process_logs = true; + let mut calls = 0u32; let current_ec = self.entry_count.load(); let current_ws = self.weighted_size.load(); let mut eviction_state = EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref()); loop { - calls += 1; + if should_process_logs { + let r_len = self.read_op_ch.len(); + if r_len > 0 { + self.apply_reads(&mut deqs, &mut timer_wheel, r_len).await; + } - let r_len = self.read_op_ch.len(); - if r_len > 0 { - self.apply_reads(&mut deqs, &mut timer_wheel, r_len).await; - } + let w_len = self.write_op_ch.len(); + if w_len > 0 { + self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state) + .await; + } - let w_len = self.write_op_ch.len(); - if w_len > 0 { - self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state) - .await; - } + if self.eviction_policy == EvictionPolicyConfig::TinyLfu + && self.should_enable_frequency_sketch(&eviction_state.counters) + { + self.enable_frequency_sketch(&eviction_state.counters).await; + } - if self.eviction_policy == EvictionPolicyConfig::TinyLfu - && self.should_enable_frequency_sketch(&eviction_state.counters) - { - self.enable_frequency_sketch(&eviction_state.counters).await; - } + // If there are any async tasks waiting in `BaseCache::schedule_write_op` + // method for the write op channel to have enough room, notify them. + let listeners = self.write_op_ch_ready_event.total_listeners(); + if listeners > 0 { + let n = listeners.min(WRITE_LOG_CH_SIZE - self.write_op_ch.len()); + // Notify the `n` listeners. The `notify` method accepts 0, so no + // need to check if `n` is greater than 0. + self.write_op_ch_ready_event.notify(n); + } - // If there are any async tasks waiting in `BaseCache::schedule_write_op` - // method for the write op channel to have enough room, notify them. - let listeners = self.write_op_ch_ready_event.total_listeners(); - if listeners > 0 { - let n = listeners.min(WRITE_LOG_CH_SIZE - self.write_op_ch.len()); - // Notify the `n` listeners. The `notify` method accepts 0, so no - // need to check if `n` is greater than 0. - self.write_op_ch_ready_event.notify(n); + calls += 1; } // Set this flag to `false`. The `evict_*` and `invalidate_*` methods // below may set it to `true` if there are more entries to evict in next // loop. eviction_state.more_entries_to_evict = false; + let last_eviction_count = eviction_state.counters.eviction_count; // Evict entries if there are any expired entries in the hierarchical // timer wheels. @@ -1542,14 +1547,21 @@ where // Check whether to continue this loop or not. - let should_process_logs = calls <= max_log_sync_repeats + should_process_logs = calls <= max_log_sync_repeats && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT); - if !should_process_logs && !eviction_state.more_entries_to_evict { + let should_evict_more_entries = eviction_state.more_entries_to_evict + // Check if there were any entries evicted in this loop. + && (eviction_state.counters.eviction_count - last_eviction_count) > 0; + + // Break the loop if there will be nothing to do in next loop. + if !should_process_logs && !should_evict_more_entries { break; } + // Break the loop if the eviction listener is set and timeout has been + // reached. if let (Some(to), Some(started)) = (timeout, started_at) { let elapsed = self .current_time_from_expiration_clock() @@ -1786,6 +1798,7 @@ where .notify_entry_removal(key, &entry, RemovalCause::Size) .await; } + eviction_state.counters.incr_eviction_count(); } entry.entry_info().set_policy_gen(gen); return; @@ -1833,6 +1846,8 @@ where .notify_entry_removal(vic_key, &vic_entry, RemovalCause::Size) .await; } + eviction_state.counters.incr_eviction_count(); + // And then remove the victim from the deques. Self::handle_remove( deqs, @@ -1889,6 +1904,7 @@ where .notify_entry_removal(key, &entry, RemovalCause::Size) .await; } + eviction_state.counters.incr_eviction_count(); } } } @@ -2172,6 +2188,7 @@ where .notify_entry_removal(key, &entry, RemovalCause::Expired) .await; } + eviction_state.counters.incr_eviction_count(); Self::handle_remove_without_timer_wheel( deqs, entry, @@ -2189,7 +2206,7 @@ where &self, deqs: &mut MutexGuard<'_, Deques>, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, state: &mut EvictionState<'_, K, V>, ) where V: Clone, @@ -2220,7 +2237,7 @@ where cache_region: CacheRegion, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2258,8 +2275,15 @@ where // we change `last_modified` and `last_accessed` in `EntryInfo` from // `Option` to `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { + // `is_dirty` is true or `last_modified` is None. Skip this entry + // as it may have been updated by this or other async task but + // its `WriteOp` is not processed yet. let (ao_deq, wo_deq) = deqs.select_mut(cache_region); self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + // Set `more_to_evict` to `false` to make `run_pending_tasks` to + // return early. This will help that `schedule_write_op` to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2292,6 +2316,7 @@ where .notify_entry_removal(key, &entry, cause) .await; } + eviction_state.counters.incr_eviction_count(); let (ao_deq, wo_deq) = deqs.select_mut(cache_region); Self::handle_remove_with_deques( deq_name, @@ -2304,6 +2329,7 @@ where } else { let (ao_deq, wo_deq) = deqs.select_mut(cache_region); self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + more_to_evict = false; } } @@ -2358,7 +2384,7 @@ where &self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2395,7 +2421,14 @@ where // we change `last_modified` and `last_accessed` in `EntryInfo` from // `Option` to `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { + // `is_dirty` is true or `last_modified` is None. Skip this entry + // as it may have been updated by this or other async task but + // its `WriteOp` is not processed yet. self.skip_updated_entry_wo(&key, hash, deqs); + // Set `more_to_evict` to `false` to make `run_pending_tasks` to + // return early. This will help that `schedule_write_op` to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2424,9 +2457,11 @@ where .notify_entry_removal(key, &entry, cause) .await; } + eviction_state.counters.incr_eviction_count(); Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters); } else { self.skip_updated_entry_wo(&key, hash, deqs); + more_to_evict = false; } } @@ -2440,7 +2475,7 @@ where invalidator: &Invalidator, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, @@ -2502,7 +2537,7 @@ where &self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, weights_to_evict: u64, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2519,19 +2554,15 @@ where break; } - let maybe_key_hash_ts = deqs - .select_mut(CacheRegion::MainProbation) - .0 - .peek_front() - .map(|node| { - let entry_info = node.element.entry_info(); - ( - Arc::clone(node.element.key()), - node.element.hash(), - entry_info.is_dirty(), - entry_info.last_accessed(), - ) - }); + let maybe_key_hash_ts = deqs.select_mut(CACHE_REGION).0.peek_front().map(|node| { + let entry_info = node.element.entry_info(); + ( + Arc::clone(node.element.key()), + node.element.hash(), + entry_info.is_dirty(), + entry_info.last_accessed(), + ) + }); let (key, hash, ts) = match maybe_key_hash_ts { Some((key, hash, false, Some(ts))) => (key, hash, ts), @@ -2539,8 +2570,15 @@ where // we change `last_modified` and `last_accessed` in `EntryInfo` from // `Option` to `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { + // `is_dirty` is true or `last_modified` is None. Skip this entry + // as it may have been updated by this or other async task but + // its `WriteOp` is not processed yet. let (ao_deq, wo_deq) = deqs.select_mut(CACHE_REGION); self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + // Set `more_to_evict` to `false` to make `run_pending_tasks` to + // return early. This will help that `schedule_write_op` to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2575,6 +2613,7 @@ where .notify_entry_removal(key, &entry, RemovalCause::Size) .await; } + eviction_state.counters.incr_eviction_count(); let weight = entry.policy_weight(); let (deq, write_order_deq) = deqs.select_mut(CacheRegion::MainProbation); Self::handle_remove_with_deques( @@ -2589,6 +2628,7 @@ where } else { let (ao_deq, wo_deq) = deqs.select_mut(CacheRegion::MainProbation); self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + more_to_evict = false; } } diff --git a/src/future/cache.rs b/src/future/cache.rs index 812e26fb..5bc2fd64 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -5113,8 +5113,8 @@ mod tests { const MAX_CAPACITY: u64 = 20; const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); - const MAX_LOG_SYNC_REPEATS: usize = 1; - const EVICTION_BATCH_SIZE: usize = 1; + const MAX_LOG_SYNC_REPEATS: u32 = 1; + const EVICTION_BATCH_SIZE: u32 = 1; let hk_conf = HousekeeperConfig::new( Some(EVICTION_TIMEOUT), @@ -5176,8 +5176,8 @@ mod tests { const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); const LISTENER_DELAY: Duration = Duration::from_millis(11); - const MAX_LOG_SYNC_REPEATS: usize = 1; - const EVICTION_BATCH_SIZE: usize = 1; + const MAX_LOG_SYNC_REPEATS: u32 = 1; + const EVICTION_BATCH_SIZE: u32 = 1; let hk_conf = HousekeeperConfig::new( Some(EVICTION_TIMEOUT), diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index d20cad31..49cf334d 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -24,12 +24,13 @@ use futures_util::future::{BoxFuture, Shared}; #[async_trait] pub(crate) trait InnerSync { - /// Runs the pending tasks. Returns `true` if there are more entries to evict. + /// Runs the pending tasks. Returns `true` if there are more entries to evict in + /// next run. async fn run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool; /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method @@ -58,10 +59,10 @@ pub(crate) struct Housekeeper { maintenance_task_timeout: Option, /// The maximum repeat count for receiving operation logs from the read and write /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. - max_log_sync_repeats: usize, + max_log_sync_repeats: u32, /// The batch size of entries to be processed by each internal eviction method. /// Default: `EVICTION_BATCH_SIZE`. - eviction_batch_size: usize, + eviction_batch_size: u32, auto_run_enabled: AtomicBool, #[cfg(test)] pub(crate) start_count: AtomicUsize, @@ -138,12 +139,14 @@ impl Housekeeper { cache.notify_write_op_ch_is_ready(); } - pub(crate) async fn try_run_pending_tasks(&self, cache: Arc) -> bool + /// Tries to run the pending tasks if the lock is free. Returns `true` if there + /// are more entries to evict in next run. + pub(crate) async fn try_run_pending_tasks(&self, cache: &Arc) -> bool where T: InnerSync + Send + Sync + 'static, { if let Some(mut current_task) = self.current_task.try_lock() { - self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task) + self.do_run_pending_tasks(Arc::clone(cache), &mut current_task) .await; } else { return false; diff --git a/src/sync/cache.rs b/src/sync/cache.rs index bd231251..53650850 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -4803,8 +4803,8 @@ mod tests { const MAX_CAPACITY: u64 = 20; const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); - const MAX_LOG_SYNC_REPEATS: usize = 1; - const EVICTION_BATCH_SIZE: usize = 1; + const MAX_LOG_SYNC_REPEATS: u32 = 1; + const EVICTION_BATCH_SIZE: u32 = 1; let hk_conf = HousekeeperConfig::new( Some(EVICTION_TIMEOUT), @@ -4866,8 +4866,8 @@ mod tests { const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); const LISTENER_DELAY: Duration = Duration::from_millis(11); - const MAX_LOG_SYNC_REPEATS: usize = 1; - const EVICTION_BATCH_SIZE: usize = 1; + const MAX_LOG_SYNC_REPEATS: u32 = 1; + const EVICTION_BATCH_SIZE: u32 = 1; let hk_conf = HousekeeperConfig::new( Some(EVICTION_TIMEOUT), diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index fe6cc9ab..e6b9ec05 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -790,6 +790,7 @@ impl<'a, K, V> EvictionState<'a, K, V> { struct EvictionCounters { entry_count: u64, weighted_size: u64, + eviction_count: u64, } impl EvictionCounters { @@ -798,6 +799,7 @@ impl EvictionCounters { Self { entry_count, weighted_size, + eviction_count: 0, } } @@ -814,6 +816,12 @@ impl EvictionCounters { let total = &mut self.weighted_size; *total = total.saturating_sub(weight as u64); } + + #[inline] + fn incr_eviction_count(&mut self) { + let count = &mut self.eviction_count; + *count = count.saturating_add(1); + } } #[derive(Default)] @@ -1242,8 +1250,8 @@ where fn run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool { self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) } @@ -1262,8 +1270,8 @@ where fn do_run_pending_tasks( &self, timeout: Option, - max_log_sync_repeats: usize, - eviction_batch_size: usize, + max_log_sync_repeats: u32, + eviction_batch_size: u32, ) -> bool { if self.max_capacity == Some(0) { return false; @@ -1278,35 +1286,39 @@ where } else { None }; - let mut calls = 0; + let mut should_process_logs = true; + let mut calls = 0u32; let current_ec = self.entry_count.load(); let current_ws = self.weighted_size.load(); let mut eviction_state = EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref()); loop { - calls += 1; + if should_process_logs { + let r_len = self.read_op_ch.len(); + if r_len > 0 { + self.apply_reads(&mut deqs, &mut timer_wheel, r_len); + } - let r_len = self.read_op_ch.len(); - if r_len > 0 { - self.apply_reads(&mut deqs, &mut timer_wheel, r_len); - } + let w_len = self.write_op_ch.len(); + if w_len > 0 { + self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state); + } - let w_len = self.write_op_ch.len(); - if w_len > 0 { - self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state); - } + if self.eviction_policy == EvictionPolicyConfig::TinyLfu + && self.should_enable_frequency_sketch(&eviction_state.counters) + { + self.enable_frequency_sketch(&eviction_state.counters); + } - if self.eviction_policy == EvictionPolicyConfig::TinyLfu - && self.should_enable_frequency_sketch(&eviction_state.counters) - { - self.enable_frequency_sketch(&eviction_state.counters); + calls += 1; } // Set this flag to `false`. The `evict_*` and `invalidate_*` methods // below may set it to `true` if there are more entries to evict in next // loop. eviction_state.more_entries_to_evict = false; + let last_eviction_count = eviction_state.counters.eviction_count; // Evict entries if there are any expired entries in the hierarchical // timer wheels. @@ -1357,14 +1369,21 @@ where // Check whether to continue this loop or not. - let should_process_logs = calls <= max_log_sync_repeats + should_process_logs = calls <= max_log_sync_repeats && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT); - if !should_process_logs && !eviction_state.more_entries_to_evict { + let should_evict_more_entries = eviction_state.more_entries_to_evict + // Check if there were any entries evicted in this loop. + && (eviction_state.counters.eviction_count - last_eviction_count) > 0; + + // Break the loop if there will be nothing to do in next loop. + if !should_process_logs && !should_evict_more_entries { break; } + // Break the loop if the eviction listener is set and timeout has been + // reached. if let (Some(to), Some(started)) = (timeout, started_at) { let elapsed = self .current_time_from_expiration_clock() @@ -1584,6 +1603,7 @@ where let key = Arc::clone(&kh.key); eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size); } + eviction_state.counters.incr_eviction_count(); } entry.entry_info().set_policy_gen(gen); return; @@ -1629,6 +1649,7 @@ where RemovalCause::Size, ); } + eviction_state.counters.incr_eviction_count(); // And then remove the victim from the deques. Self::handle_remove( deqs, @@ -1680,6 +1701,7 @@ where if eviction_state.is_notifier_enabled() { eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size); } + eviction_state.counters.incr_eviction_count(); } } }; @@ -1953,6 +1975,7 @@ where let key = Arc::clone(key); eviction_state.notify_entry_removal(key, &entry, RemovalCause::Expired); } + eviction_state.counters.incr_eviction_count(); Self::handle_remove_without_timer_wheel( deqs, entry, @@ -1971,7 +1994,7 @@ where &self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, state: &mut EvictionState<'_, K, V>, ) where V: Clone, @@ -1998,7 +2021,7 @@ where cache_region: CacheRegion, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2037,7 +2060,14 @@ where // we change `last_modified` and `last_accessed` in `EntryInfo` from // `Option` to `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { + // `is_dirty` is true or `last_modified` is None. Skip this entry + // as it may have been updated by this or other async task but + // its `WriteOp` is not processed yet. self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + // Set `more_to_evict` to `false` to make `run_pending_tasks` to + // return early. This will help that `schedule_write_op` to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2064,6 +2094,7 @@ where if eviction_state.is_notifier_enabled() { eviction_state.notify_entry_removal(key, &entry, cause); } + eviction_state.counters.incr_eviction_count(); Self::handle_remove_with_deques( deq_name, ao_deq, @@ -2074,6 +2105,7 @@ where ); } else { self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + more_to_evict = false; } } @@ -2128,7 +2160,7 @@ where &self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2166,6 +2198,11 @@ where // `Option` to `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { self.skip_updated_entry_wo(&key, hash, deqs); + // If the key is the caller key, set `more_to_evict` to `false` + // to make `run_pending_tasks` to return early. This will help + // that `schedule_write_op`, a caller in the call tree, to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2188,9 +2225,13 @@ where if eviction_state.is_notifier_enabled() { eviction_state.notify_entry_removal(key, &entry, cause); } + eviction_state.counters.incr_eviction_count(); Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters); } else { self.skip_updated_entry_wo(&key, hash, deqs); + // if caller_key.map(|ck| ck == &key).unwrap_or_default() { + more_to_evict = false; + // } } } @@ -2204,7 +2245,7 @@ where invalidator: &Invalidator, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, @@ -2265,7 +2306,7 @@ where &self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, - batch_size: usize, + batch_size: u32, weights_to_evict: u64, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -2299,7 +2340,14 @@ where // `last_modified` and `last_accessed` in `EntryInfo` from `Option` to // `Instant`. Some((key, hash, true, _) | (key, hash, false, None)) => { + // `is_dirty` is true or `last_modified` is None. Skip this entry + // as it may have been updated by this or other async task but + // its `WriteOp` is not processed yet. self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + // Set `more_to_evict` to `false` to make `run_pending_tasks` to + // return early. This will help that `schedule_write_op` to send + // the `WriteOp` to the write op channel. + more_to_evict = false; continue; } None => { @@ -2328,6 +2376,7 @@ where if eviction_state.is_notifier_enabled() { eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size); } + eviction_state.counters.incr_eviction_count(); let weight = entry.policy_weight(); Self::handle_remove_with_deques( deq_name, @@ -2340,6 +2389,9 @@ where evicted = evicted.saturating_add(weight as u64); } else { self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq); + // if caller_key.map(|ck| ck == &key).unwrap_or_default() { + more_to_evict = false; + // } } }