Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Remove boolean from put in replacer
Browse files Browse the repository at this point in the history
  • Loading branch information
lanlou1554 committed Apr 1, 2024
1 parent ee7aed2 commit b78f7df
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 26 deletions.
15 changes: 8 additions & 7 deletions storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,18 @@ impl<R: DataStoreReplacer> DataStoreCache for MemDiskStoreCache<R> {
// If bytes_to_disk has nothing, it means the data has been successfully written to memory.
// We have to put it into mem_replacer too.
let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await;
let (status, evicted_keys) = mem_replacer.put(
let replacer_put_status = mem_replacer.put(
remote_location.clone(),
(remote_location.clone(), bytes_mem_written),
);
if !status {
// Insertion fails.
if replacer_put_status.is_none() {
// Put the data to disk cache.
bytes_to_disk = Some(mem_store.clean_data(&remote_location).unwrap().0);
} else {
// If successfully putting it into mem_replacer, we should record the evicted data,
// delete them from mem_store, and put all of them to disk cache.
if let Some(evicted_keys) = evicted_keys {
if let Some(evicted_keys) = replacer_put_status {
evicted_bytes_to_disk = Some(
evicted_keys
.iter()
Expand All @@ -167,9 +168,9 @@ impl<R: DataStoreReplacer> DataStoreCache for MemDiskStoreCache<R> {
.write_data(disk_store_key.clone(), Some(bytes_vec), None)
.await?;
let mut disk_replacer = self.disk_replacer.write().await;
if !disk_replacer
if disk_replacer
.put(remote_location_evicted, (disk_store_key.clone(), data_size))
.0
.is_none()
{
self.disk_store.clean_data(&disk_store_key).await?;
}
Expand All @@ -191,9 +192,9 @@ impl<R: DataStoreReplacer> DataStoreCache for MemDiskStoreCache<R> {
.write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream))
.await?;
let mut disk_replacer = self.disk_replacer.write().await;
if !disk_replacer
if disk_replacer
.put(remote_location, (disk_store_key.clone(), data_size))
.0
.is_none()
{
self.disk_store.clean_data(&disk_store_key).await?;
// TODO: do we need to notify the caller this failure?
Expand Down
12 changes: 4 additions & 8 deletions storage-node/src/cache/policy/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruReplacer<K, V> {
}
}

fn put_value(&mut self, key: K, value: V) -> (bool, Option<Vec<K>>) {
fn put_value(&mut self, key: K, value: V) -> Option<Vec<K>> {
if value.size() > self.max_capacity {
// If the object size is greater than the max capacity, we do not insert the
// object into the cache.
Expand All @@ -46,7 +46,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruReplacer<K, V> {
value.size(),
self.max_capacity
);
return (false, None);
return None;
}
if let Some(cache_value) = self.cache_map.get(&key) {
// If the key already exists, update the cache size.
Expand All @@ -62,11 +62,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruReplacer<K, V> {
self.size -= cache_value.size();
}
}
if !evicted_keys.is_empty() {
(true, Some(evicted_keys))
} else {
(true, None)
}
Some(evicted_keys)
}

fn peek_value(&self, key: &K) -> Option<&V> {
Expand All @@ -83,7 +79,7 @@ impl DataStoreReplacer for LruReplacer<ParpulseDataStoreCacheKey, ParpulseDataSt
&mut self,
key: ParpulseDataStoreCacheKey,
value: ParpulseDataStoreCacheValue,
) -> (bool, Option<Vec<ParpulseDataStoreCacheKey>>) {
) -> Option<Vec<ParpulseDataStoreCacheKey>> {
self.put_value(key, value)
}

Expand Down
14 changes: 5 additions & 9 deletions storage-node/src/cache/policy/lru_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruKReplacer<K, V> {
None
}

fn put_value(&mut self, key: K, value: V) -> (bool, Option<Vec<K>>) {
fn put_value(&mut self, key: K, value: V) -> Option<Vec<K>> {
if value.size() > self.max_capacity {
// If the object size is greater than the max capacity, we do not insert the
// object into the cache.
Expand All @@ -116,7 +116,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruKReplacer<K, V> {
value.size(),
self.max_capacity
);
return (false, None);
return None;
}
let updated_size = value.size();
let mut new_history: VecDeque<Timestamp> = VecDeque::new();
Expand Down Expand Up @@ -148,11 +148,7 @@ impl<K: DataStoreCacheKey, V: DataStoreCacheValue> LruKReplacer<K, V> {
evicted_keys.push(evicted_key);
}
}
if !evicted_keys.is_empty() {
(true, Some(evicted_keys))
} else {
(true, None)
}
Some(evicted_keys)
}

fn peek_value(&self, key: &K) -> Option<&V> {
Expand All @@ -179,7 +175,7 @@ impl DataStoreReplacer for LruKReplacer<ParpulseDataStoreCacheKey, ParpulseDataS
&mut self,
key: ParpulseDataStoreCacheKey,
value: ParpulseDataStoreCacheValue,
) -> (bool, Option<Vec<ParpulseDataStoreCacheKey>>) {
) -> Option<Vec<ParpulseDataStoreCacheKey>> {
self.put_value(key, value)
}

Expand Down Expand Up @@ -236,7 +232,7 @@ mod tests {
let key = "key1".to_string();
let value = "value1".to_string();
assert_eq!(cache.peek(&key), None);
assert!(cache.put(key.clone(), (value.clone(), 1)).0);
assert!(cache.put(key.clone(), (value.clone(), 1)).is_some());
assert_eq!(cache.peek(&key), Some(&(value.clone(), 1)));
assert_eq!(cache.len(), 1);
assert_eq!(cache.size(), 1);
Expand Down
5 changes: 3 additions & 2 deletions storage-node/src/cache/policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ pub trait DataStoreReplacer: Send + Sync {
fn get(&mut self, key: &ParpulseDataStoreCacheKey) -> Option<&ParpulseDataStoreCacheValue>;

/// Puts a value into the cache.
/// Returns success or failure and the evicted key if applicable.
/// Returns `None`: insertion failed.
/// Returns `Some`: insertion successful with a list of keys that are evicted from the cache.
fn put(
&mut self,
key: ParpulseDataStoreCacheKey,
value: ParpulseDataStoreCacheValue,
) -> (bool, Option<Vec<ParpulseDataStoreCacheKey>>);
) -> Option<Vec<ParpulseDataStoreCacheKey>>;

/// Returns a reference to the value in the cache with no side effect on the
/// cache.
Expand Down

0 comments on commit b78f7df

Please sign in to comment.