From b78f7df71e0e37b6918c3a4f31f5bfab5ce4448f Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 1 Apr 2024 12:49:39 -0400 Subject: [PATCH] Remove boolean from put in replacer --- .../data_store_cache/memdisk/cache_manager.rs | 15 ++++++++------- storage-node/src/cache/policy/lru.rs | 12 ++++-------- storage-node/src/cache/policy/lru_k.rs | 14 +++++--------- storage-node/src/cache/policy/mod.rs | 5 +++-- 4 files changed, 20 insertions(+), 26 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs b/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs index 4b52900..1ae96c0 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs @@ -132,17 +132,18 @@ impl DataStoreCache for MemDiskStoreCache { // If bytes_to_disk has nothing, it means the data has been successfully written to memory. // We have to put it into mem_replacer too. let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await; - let (status, evicted_keys) = mem_replacer.put( + let replacer_put_status = mem_replacer.put( remote_location.clone(), (remote_location.clone(), bytes_mem_written), ); - if !status { + // Insertion fails. + if replacer_put_status.is_none() { // Put the data to disk cache. bytes_to_disk = Some(mem_store.clean_data(&remote_location).unwrap().0); } else { // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. - if let Some(evicted_keys) = evicted_keys { + if let Some(evicted_keys) = replacer_put_status { evicted_bytes_to_disk = Some( evicted_keys .iter() @@ -167,9 +168,9 @@ impl DataStoreCache for MemDiskStoreCache { .write_data(disk_store_key.clone(), Some(bytes_vec), None) .await?; let mut disk_replacer = self.disk_replacer.write().await; - if !disk_replacer + if disk_replacer .put(remote_location_evicted, (disk_store_key.clone(), data_size)) - .0 + .is_none() { self.disk_store.clean_data(&disk_store_key).await?; } @@ -191,9 +192,9 @@ impl DataStoreCache for MemDiskStoreCache { .write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream)) .await?; let mut disk_replacer = self.disk_replacer.write().await; - if !disk_replacer + if disk_replacer .put(remote_location, (disk_store_key.clone(), data_size)) - .0 + .is_none() { self.disk_store.clean_data(&disk_store_key).await?; // TODO: do we need to notify the caller this failure? diff --git a/storage-node/src/cache/policy/lru.rs b/storage-node/src/cache/policy/lru.rs index 33d7d9d..37a5888 100644 --- a/storage-node/src/cache/policy/lru.rs +++ b/storage-node/src/cache/policy/lru.rs @@ -33,7 +33,7 @@ impl LruReplacer { } } - fn put_value(&mut self, key: K, value: V) -> (bool, Option>) { + fn put_value(&mut self, key: K, value: V) -> Option> { if value.size() > self.max_capacity { // If the object size is greater than the max capacity, we do not insert the // object into the cache. @@ -46,7 +46,7 @@ impl LruReplacer { value.size(), self.max_capacity ); - return (false, None); + return None; } if let Some(cache_value) = self.cache_map.get(&key) { // If the key already exists, update the cache size. @@ -62,11 +62,7 @@ impl LruReplacer { self.size -= cache_value.size(); } } - if !evicted_keys.is_empty() { - (true, Some(evicted_keys)) - } else { - (true, None) - } + Some(evicted_keys) } fn peek_value(&self, key: &K) -> Option<&V> { @@ -83,7 +79,7 @@ impl DataStoreReplacer for LruReplacer (bool, Option>) { + ) -> Option> { self.put_value(key, value) } diff --git a/storage-node/src/cache/policy/lru_k.rs b/storage-node/src/cache/policy/lru_k.rs index 7f74fbd..8c1c47e 100644 --- a/storage-node/src/cache/policy/lru_k.rs +++ b/storage-node/src/cache/policy/lru_k.rs @@ -104,7 +104,7 @@ impl LruKReplacer { None } - fn put_value(&mut self, key: K, value: V) -> (bool, Option>) { + fn put_value(&mut self, key: K, value: V) -> Option> { if value.size() > self.max_capacity { // If the object size is greater than the max capacity, we do not insert the // object into the cache. @@ -116,7 +116,7 @@ impl LruKReplacer { value.size(), self.max_capacity ); - return (false, None); + return None; } let updated_size = value.size(); let mut new_history: VecDeque = VecDeque::new(); @@ -148,11 +148,7 @@ impl LruKReplacer { evicted_keys.push(evicted_key); } } - if !evicted_keys.is_empty() { - (true, Some(evicted_keys)) - } else { - (true, None) - } + Some(evicted_keys) } fn peek_value(&self, key: &K) -> Option<&V> { @@ -179,7 +175,7 @@ impl DataStoreReplacer for LruKReplacer (bool, Option>) { + ) -> Option> { self.put_value(key, value) } @@ -236,7 +232,7 @@ mod tests { let key = "key1".to_string(); let value = "value1".to_string(); assert_eq!(cache.peek(&key), None); - assert!(cache.put(key.clone(), (value.clone(), 1)).0); + assert!(cache.put(key.clone(), (value.clone(), 1)).is_some()); assert_eq!(cache.peek(&key), Some(&(value.clone(), 1))); assert_eq!(cache.len(), 1); assert_eq!(cache.size(), 1); diff --git a/storage-node/src/cache/policy/mod.rs b/storage-node/src/cache/policy/mod.rs index dc28e60..1080d64 100644 --- a/storage-node/src/cache/policy/mod.rs +++ b/storage-node/src/cache/policy/mod.rs @@ -56,12 +56,13 @@ pub trait DataStoreReplacer: Send + Sync { fn get(&mut self, key: &ParpulseDataStoreCacheKey) -> Option<&ParpulseDataStoreCacheValue>; /// Puts a value into the cache. - /// Returns success or failure and the evicted key if applicable. + /// Returns `None`: insertion failed. + /// Returns `Some`: insertion successful with a list of keys that are evicted from the cache. fn put( &mut self, key: ParpulseDataStoreCacheKey, value: ParpulseDataStoreCacheValue, - ) -> (bool, Option>); + ) -> Option>; /// Returns a reference to the value in the cache with no side effect on the /// cache.