From 297913c0adcb677eb369d520e9de7c80696fbde3 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Tue, 12 Nov 2024 21:21:06 -0800 Subject: [PATCH] Fix bug with object_by_id inserts. (#20236) The previous version of this code was based on a misunderstanding of the Moka Cache API. It assumed that: let entry = cache.entry(k).or_insert_with(|| v); entry.is_fresh() could be used to detect whether there was a race with cache.insert(k, v) In fact this is not the case! Both threads must use `or_insert_with` in order to handle the race correctly. The bug is instantly reproducible with the following code: use moka::sync::Cache; use std::sync::{Arc, Mutex}; fn monotonic_update(cache: Arc>>>, key: u64, value: u64) { let entry = cache .entry(key) .or_insert_with(|| Arc::new(Mutex::new(value))); if !entry.is_fresh() { let mut entry = entry.value().lock().unwrap(); // only update if the new value is greater than the current value if value > *entry { *entry = value; } } } fn blind_write(cache: Arc>>>, key: u64, value: u64) { cache.insert(key, Arc::new(Mutex::new(value))); } fn main() { for _ in 0..1000 { let cache = Arc::new(Cache::new(1000)); let cache1 = cache.clone(); let cache2 = cache.clone(); let handle1 = std::thread::spawn(move || { monotonic_update(cache1, 1, 1); }); let handle2 = std::thread::spawn(move || { // Correct way to update the value // monotonic_update(cache2, 1, 2); // Incorrect way to update the value blind_write(cache2, 1, 2); }); handle1.join().unwrap(); handle2.join().unwrap(); let entry = cache.get(&1).unwrap(); let value = entry.lock().unwrap(); assert_eq!(*value, 2); } } ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../src/execution_cache/cache_types.rs | 72 +++++++++++++++++ .../src/execution_cache/writeback_cache.rs | 78 +++++-------------- 2 files changed, 91 insertions(+), 59 deletions(-) diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index acfd5aea5b1db..3faea764a1f4d 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -3,7 +3,11 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use std::hash::Hash; +use std::sync::Arc; +use moka::sync::Cache as MokaCache; +use parking_lot::Mutex; use sui_types::base_types::SequenceNumber; /// CachedVersionMap is a map from version to value, with the additional contraints: @@ -137,6 +141,74 @@ where } } +// Could just use the Ord trait but I think it would be confusing to overload it +// in that way. +pub trait IsNewer { + fn is_newer_than(&self, other: &Self) -> bool; +} + +pub struct MonotonicCache { + cache: MokaCache>>, +} + +impl MonotonicCache +where + K: Hash + Eq + Send + Sync + Copy + 'static, + V: IsNewer + Clone + Send + Sync + 'static, +{ + pub fn new(cache_size: u64) -> Self { + Self { + cache: MokaCache::builder().max_capacity(cache_size).build(), + } + } + + pub fn get(&self, key: &K) -> Option>> { + self.cache.get(key) + } + + // Update the cache with guaranteed monotonicity. That is, if there are N + // calls to the this function from N threads, the write with the newest value will + // win the race regardless of what ordering the writes occur in. + // + // Caller should log the insert with trace! and increment the appropriate metric. + pub fn insert(&self, key: &K, value: V) { + // Warning: tricky code! + let entry = self + .cache + .entry(*key) + // only one racing insert will call the closure + .or_insert_with(|| Arc::new(Mutex::new(value.clone()))); + + // We may be racing with another thread that observed an older version of value + if !entry.is_fresh() { + // !is_fresh means we lost the race, and entry holds the value that was + // inserted by the other thread. We need to check if we have a more recent value + // than the other reader. + let mut entry = entry.value().lock(); + if value.is_newer_than(&entry) { + *entry = value; + } + } + } + + pub fn invalidate(&self, key: &K) { + self.cache.invalidate(key); + } + + #[cfg(test)] + pub fn contains_key(&self, key: &K) -> bool { + self.cache.contains_key(key) + } + + pub fn invalidate_all(&self) { + self.cache.invalidate_all(); + } + + pub fn is_empty(&self) -> bool { + self.cache.iter().next().is_none() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index f512902e68f01..fd0dbe2ba863e 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -78,7 +78,9 @@ use tracing::{debug, info, instrument, trace, warn}; use super::ExecutionCacheAPI; use super::{ - cache_types::CachedVersionMap, implement_passthrough_traits, object_locks::ObjectLocks, + cache_types::{CachedVersionMap, IsNewer, MonotonicCache}, + implement_passthrough_traits, + object_locks::ObjectLocks, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead, }; @@ -153,6 +155,16 @@ enum LatestObjectCacheEntry { } impl LatestObjectCacheEntry { + #[cfg(test)] + fn version(&self) -> Option { + match self { + LatestObjectCacheEntry::Object(version, _) => Some(*version), + LatestObjectCacheEntry::NonExistent => None, + } + } +} + +impl IsNewer for LatestObjectCacheEntry { fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool { match (self, other) { (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => { @@ -162,14 +174,6 @@ impl LatestObjectCacheEntry { _ => false, } } - - #[cfg(test)] - fn version(&self) -> Option { - match self { - LatestObjectCacheEntry::Object(version, _) => Some(*version), - LatestObjectCacheEntry::NonExistent => None, - } - } } type MarkerKey = (EpochId, ObjectID); @@ -271,7 +275,7 @@ struct CachedCommittedData { // We cannot simply insert objects that we read off the disk into `object_cache`, // since that may violate the no-missing-versions property. // `object_by_id_cache` is also written to on writes so that it is always coherent. - object_by_id_cache: MokaCache>>, + object_by_id_cache: MonotonicCache, // See module level comment for an explanation of caching strategy. marker_cache: MokaCache>>>, @@ -295,10 +299,6 @@ impl CachedCommittedData { .max_capacity(MAX_CACHE_SIZE) .max_capacity(MAX_CACHE_SIZE) .build(); - let object_by_id_cache = MokaCache::builder() - .max_capacity(MAX_CACHE_SIZE) - .max_capacity(MAX_CACHE_SIZE) - .build(); let marker_cache = MokaCache::builder() .max_capacity(MAX_CACHE_SIZE) .max_capacity(MAX_CACHE_SIZE) @@ -326,7 +326,7 @@ impl CachedCommittedData { Self { object_cache, - object_by_id_cache, + object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE), marker_cache, transactions, transaction_effects, @@ -347,7 +347,7 @@ impl CachedCommittedData { self._transaction_objects.invalidate_all(); assert_empty(&self.object_cache); - assert_empty(&self.object_by_id_cache); + assert!(&self.object_by_id_cache.is_empty()); assert_empty(&self.marker_cache); assert_empty(&self.transactions); assert_empty(&self.transaction_effects); @@ -486,11 +486,8 @@ impl WritebackCache { let mut entry = self.dirty.objects.entry(*object_id).or_default(); self.cached.object_by_id_cache.insert( - *object_id, - Arc::new(Mutex::new(LatestObjectCacheEntry::Object( - version, - object.clone(), - ))), + object_id, + LatestObjectCacheEntry::Object(version, object.clone()), ); entry.insert(version, object); @@ -1087,47 +1084,10 @@ impl WritebackCache { } // Updates the latest object id cache with an entry that was read from the db. - // Writes bypass this function, because an object write is guaranteed to be the - // most recent version (and cannot race with any other writes to that object id) - // - // If there are racing calls to this function, it is guaranteed that after a call - // has returned, reads from that thread will not observe a lower version than the - // one they inserted fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) { trace!("caching object by id: {:?} {:?}", object_id, object); self.metrics.record_cache_write("object_by_id"); - // Warning: tricky code! - let entry = self - .cached - .object_by_id_cache - .entry(*object_id) - // only one racing insert will call the closure - .or_insert_with(|| Arc::new(Mutex::new(object.clone()))); - - // We may be racing with another thread that observed an older version of the object - if !entry.is_fresh() { - // !is_fresh means we lost the race, and entry holds the value that was - // inserted by the other thread. We need to check if we have a more recent version - // than the other reader. - // - // This could also mean that the entry was inserted by a transaction write. This - // could occur in the following case: - // - // THREAD 1 | THREAD 2 - // reads object at v1 | - // | tx writes object at v2 - // tries to cache v1 - // - // Thread 1 will see that v2 is already in the cache when it tries to cache it, - // and will try to update the cache with v1. But the is_newer_than check will fail, - // so v2 will remain in the cache - - // Ensure only the latest version is inserted. - let mut entry = entry.value().lock(); - if object.is_newer_than(&entry) { - *entry = object; - } - } + self.cached.object_by_id_cache.insert(object_id, object); } fn cache_object_not_found(&self, object_id: &ObjectID) {