Skip to content

Commit

Permalink
Fix bug with object_by_id inserts. (#20236)
Browse files Browse the repository at this point in the history
The previous version of this code was based on a misunderstanding of the
Moka Cache API. It assumed that:

    let entry = cache.entry(k).or_insert_with(|| v);
    entry.is_fresh()

could be used to detect whether there was a race with

    cache.insert(k, v)

In fact this is not the case! Both threads must use `or_insert_with` in
order to handle the race correctly.

The bug is instantly reproducible with the following code:

    use moka::sync::Cache;
    use std::sync::{Arc, Mutex};

fn monotonic_update(cache: Arc<Cache<u64, Arc<Mutex<u64>>>>, key: u64,
value: u64) {
        let entry = cache
            .entry(key)
            .or_insert_with(|| Arc::new(Mutex::new(value)));

        if !entry.is_fresh() {
            let mut entry = entry.value().lock().unwrap();
// only update if the new value is greater than the current value
            if value > *entry {
                *entry = value;
            }
        }
    }

fn blind_write(cache: Arc<Cache<u64, Arc<Mutex<u64>>>>, key: u64, value:
u64) {
        cache.insert(key, Arc::new(Mutex::new(value)));
    }

    fn main() {
        for _ in 0..1000 {
            let cache = Arc::new(Cache::new(1000));
            let cache1 = cache.clone();
            let cache2 = cache.clone();

            let handle1 = std::thread::spawn(move || {
                monotonic_update(cache1, 1, 1);
            });
            let handle2 = std::thread::spawn(move || {
                // Correct way to update the value
                // monotonic_update(cache2, 1, 2);

                // Incorrect way to update the value
                blind_write(cache2, 1, 2);
            });

            handle1.join().unwrap();
            handle2.join().unwrap();

            let entry = cache.get(&1).unwrap();
            let value = entry.lock().unwrap();
            assert_eq!(*value, 2);
        }
    }

## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mystenmark authored Nov 13, 2024
1 parent ec6d016 commit 297913c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 59 deletions.
72 changes: 72 additions & 0 deletions crates/sui-core/src/execution_cache/cache_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::hash::Hash;
use std::sync::Arc;

use moka::sync::Cache as MokaCache;
use parking_lot::Mutex;
use sui_types::base_types::SequenceNumber;

/// CachedVersionMap is a map from version to value, with the additional contraints:
Expand Down Expand Up @@ -137,6 +141,74 @@ where
}
}

// Could just use the Ord trait but I think it would be confusing to overload it
// in that way.
pub trait IsNewer {
fn is_newer_than(&self, other: &Self) -> bool;
}

pub struct MonotonicCache<K, V> {
cache: MokaCache<K, Arc<Mutex<V>>>,
}

impl<K, V> MonotonicCache<K, V>
where
K: Hash + Eq + Send + Sync + Copy + 'static,
V: IsNewer + Clone + Send + Sync + 'static,
{
pub fn new(cache_size: u64) -> Self {
Self {
cache: MokaCache::builder().max_capacity(cache_size).build(),
}
}

pub fn get(&self, key: &K) -> Option<Arc<Mutex<V>>> {
self.cache.get(key)
}

// Update the cache with guaranteed monotonicity. That is, if there are N
// calls to the this function from N threads, the write with the newest value will
// win the race regardless of what ordering the writes occur in.
//
// Caller should log the insert with trace! and increment the appropriate metric.
pub fn insert(&self, key: &K, value: V) {
// Warning: tricky code!
let entry = self
.cache
.entry(*key)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(value.clone())));

// We may be racing with another thread that observed an older version of value
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent value
// than the other reader.
let mut entry = entry.value().lock();
if value.is_newer_than(&entry) {
*entry = value;
}
}
}

pub fn invalidate(&self, key: &K) {
self.cache.invalidate(key);
}

#[cfg(test)]
pub fn contains_key(&self, key: &K) -> bool {
self.cache.contains_key(key)
}

pub fn invalidate_all(&self) {
self.cache.invalidate_all();
}

pub fn is_empty(&self) -> bool {
self.cache.iter().next().is_none()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
78 changes: 19 additions & 59 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ use tracing::{debug, info, instrument, trace, warn};

use super::ExecutionCacheAPI;
use super::{
cache_types::CachedVersionMap, implement_passthrough_traits, object_locks::ObjectLocks,
cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
implement_passthrough_traits,
object_locks::ObjectLocks,
CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
};
Expand Down Expand Up @@ -153,6 +155,16 @@ enum LatestObjectCacheEntry {
}

impl LatestObjectCacheEntry {
#[cfg(test)]
fn version(&self) -> Option<SequenceNumber> {
match self {
LatestObjectCacheEntry::Object(version, _) => Some(*version),
LatestObjectCacheEntry::NonExistent => None,
}
}
}

impl IsNewer for LatestObjectCacheEntry {
fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
match (self, other) {
(LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
Expand All @@ -162,14 +174,6 @@ impl LatestObjectCacheEntry {
_ => false,
}
}

#[cfg(test)]
fn version(&self) -> Option<SequenceNumber> {
match self {
LatestObjectCacheEntry::Object(version, _) => Some(*version),
LatestObjectCacheEntry::NonExistent => None,
}
}
}

type MarkerKey = (EpochId, ObjectID);
Expand Down Expand Up @@ -271,7 +275,7 @@ struct CachedCommittedData {
// We cannot simply insert objects that we read off the disk into `object_cache`,
// since that may violate the no-missing-versions property.
// `object_by_id_cache` is also written to on writes so that it is always coherent.
object_by_id_cache: MokaCache<ObjectID, Arc<Mutex<LatestObjectCacheEntry>>>,
object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,

// See module level comment for an explanation of caching strategy.
marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
Expand All @@ -295,10 +299,6 @@ impl CachedCommittedData {
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
.build();
let object_by_id_cache = MokaCache::builder()
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
.build();
let marker_cache = MokaCache::builder()
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
Expand Down Expand Up @@ -326,7 +326,7 @@ impl CachedCommittedData {

Self {
object_cache,
object_by_id_cache,
object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
marker_cache,
transactions,
transaction_effects,
Expand All @@ -347,7 +347,7 @@ impl CachedCommittedData {
self._transaction_objects.invalidate_all();

assert_empty(&self.object_cache);
assert_empty(&self.object_by_id_cache);
assert!(&self.object_by_id_cache.is_empty());
assert_empty(&self.marker_cache);
assert_empty(&self.transactions);
assert_empty(&self.transaction_effects);
Expand Down Expand Up @@ -486,11 +486,8 @@ impl WritebackCache {
let mut entry = self.dirty.objects.entry(*object_id).or_default();

self.cached.object_by_id_cache.insert(
*object_id,
Arc::new(Mutex::new(LatestObjectCacheEntry::Object(
version,
object.clone(),
))),
object_id,
LatestObjectCacheEntry::Object(version, object.clone()),
);

entry.insert(version, object);
Expand Down Expand Up @@ -1087,47 +1084,10 @@ impl WritebackCache {
}

// Updates the latest object id cache with an entry that was read from the db.
// Writes bypass this function, because an object write is guaranteed to be the
// most recent version (and cannot race with any other writes to that object id)
//
// If there are racing calls to this function, it is guaranteed that after a call
// has returned, reads from that thread will not observe a lower version than the
// one they inserted
fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
trace!("caching object by id: {:?} {:?}", object_id, object);
self.metrics.record_cache_write("object_by_id");
// Warning: tricky code!
let entry = self
.cached
.object_by_id_cache
.entry(*object_id)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(object.clone())));

// We may be racing with another thread that observed an older version of the object
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent version
// than the other reader.
//
// This could also mean that the entry was inserted by a transaction write. This
// could occur in the following case:
//
// THREAD 1 | THREAD 2
// reads object at v1 |
// | tx writes object at v2
// tries to cache v1
//
// Thread 1 will see that v2 is already in the cache when it tries to cache it,
// and will try to update the cache with v1. But the is_newer_than check will fail,
// so v2 will remain in the cache

// Ensure only the latest version is inserted.
let mut entry = entry.value().lock();
if object.is_newer_than(&entry) {
*entry = object;
}
}
self.cached.object_by_id_cache.insert(object_id, object);
}

fn cache_object_not_found(&self, object_id: &ObjectID) {
Expand Down

0 comments on commit 297913c

Please sign in to comment.