From b6441d9fe0112ceb6a2c1fc44a93e5972d2914ee Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 17:41:10 -0400 Subject: [PATCH 01/23] Finish initial version --- storage-node/Cargo.toml | 1 + .../memdisk/data_store/disk.rs | 1 + .../memdisk/data_store/memory.rs | 3 +- .../src/cache/data_store_cache/memdisk/mod.rs | 222 ++++++++++++++++-- .../src/cache/data_store_cache/mod.rs | 4 +- storage-node/src/server.rs | 8 +- storage-node/src/storage_manager.rs | 2 +- 7 files changed, 205 insertions(+), 36 deletions(-) diff --git a/storage-node/Cargo.toml b/storage-node/Cargo.toml index 2a33f01..f94c2fd 100644 --- a/storage-node/Cargo.toml +++ b/storage-node/Cargo.toml @@ -24,3 +24,4 @@ rand = "0.8" tokio-stream = "0.1" log = "0.4" env_logger = "0.11" +dashmap = "5.5.3" diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index 2688377..7beefbf 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -77,6 +77,7 @@ impl DiskStore { bytes_vec: Option>, stream: Option, ) -> ParpulseResult { + println!("write_data: key={}", key); // NOTE(Yuanxin): Shall we spawn a task to write the data to disk? let bytes_written = self .disk_manager diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs index 21f492c..056b9d9 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs @@ -1,6 +1,7 @@ -use std::collections::HashMap; +use std::{collections::HashMap, hash::Hash}; use bytes::Bytes; +use dashmap::DashMap; use tokio::sync::mpsc::Receiver; use crate::error::ParpulseResult; diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 6810912..108398e 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -1,8 +1,11 @@ pub mod data_store; +use futures::{future::TryFutureExt, join}; +use std::{collections::HashMap, sync::Arc}; + use futures::stream::StreamExt; use log::warn; -use tokio::sync::RwLock; +use tokio::sync::{Notify, RwLock}; use crate::{ cache::{ @@ -58,14 +61,24 @@ impl ReplacerValue for MemDiskStoreReplacerValue { } } +#[derive(Clone)] +enum Status { + Incompleted, + Completed, +} + pub struct MemDiskStoreCache< R: DataStoreReplacer, > { disk_store: DiskStore, - mem_store: Option, + mem_store: Option>, /// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size) disk_replacer: RwLock, mem_replacer: Option>, + // MemDiskStoreReplacerKey -> remote_location + // status: 0 -> incompleted; 1 -> completed; 2 -> failed + // TODO(lanlou): we should clean this hashmap. + status_of_keys: RwLock)>>, } /// This method creates a `MemDiskStoreCache` instance, and memory can be disabled by setting @@ -98,9 +111,10 @@ impl> let mem_store = MemStore::new(mem_max_file_size); MemDiskStoreCache { disk_store, - mem_store: Some(mem_store), + mem_store: Some(RwLock::new(mem_store)), disk_replacer: RwLock::new(disk_replacer), mem_replacer: Some(RwLock::new(mem_replacer.unwrap())), + status_of_keys: RwLock::new(HashMap::new()), } } else { MemDiskStoreCache { @@ -108,6 +122,7 @@ impl> mem_store: None, disk_replacer: RwLock::new(disk_replacer), mem_replacer: None, + status_of_keys: RwLock::new(HashMap::new()), } } } @@ -118,7 +133,7 @@ impl> D for MemDiskStoreCache { async fn get_data_from_cache( - &mut self, + &self, remote_location: String, ) -> ParpulseResult>>> { // TODO: Refine the lock. @@ -126,7 +141,7 @@ impl> D let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await; if let Some(replacer_value) = mem_replacer.get(&remote_location) { let data_store_key = replacer_value.as_value(); - match mem_store.read_data(data_store_key) { + match mem_store.read().await.read_data(data_store_key) { Ok(Some(rx)) => return Ok(Some(rx)), Ok(None) => { return Err(ParpulseError::Internal( @@ -153,11 +168,51 @@ impl> D } async fn put_data_to_cache( - &mut self, + &self, remote_location: String, _data_size: Option, mut data_stream: StorageReaderStream, ) -> ParpulseResult { + { + let mut existed = false; + + // If in_progress of remote_location thread fails, it will clean the data from this hash map. + + loop { + let mut status = Status::Incompleted; + let mut size: usize = 0; + let mut notify = Arc::new(Notify::new()); + if let Some(((status_ref, size_ref), notify_ref)) = + self.status_of_keys.read().await.get(&remote_location) + { + existed = true; + status = status_ref.clone(); + size = *size_ref; + notify = notify_ref.clone(); + } else { + break; + } + match status { + Status::Incompleted => { + notify.notified().await; + } + Status::Completed => { + return Ok(size); + } + } + } + if existed { + // Another in progress of remote_location thread fails + return Err(ParpulseError::Internal( + "Put_data_to_cache fails".to_string(), + )); + } + self.status_of_keys.write().await.insert( + remote_location.clone(), + ((Status::Incompleted, 0), Arc::new(Notify::new())), + ); + } + // TODO: Refine the lock. // TODO(lanlou): Also write the data to network. let mut bytes_to_disk = None; @@ -166,14 +221,16 @@ impl> D None; // 1. If the mem_store is enabled, first try to write the data to memory. // Note: Only file which size < mem_max_file_size can be written to memory. - if let Some(mem_store) = &mut self.mem_store { + if let Some(mem_store) = &self.mem_store { loop { match data_stream.next().await { Some(Ok(bytes)) => { bytes_mem_written += bytes.len(); // TODO: Need to write the data to network. - if let Some((bytes_vec, _)) = - mem_store.write_data(remote_location.clone(), bytes) + if let Some((bytes_vec, _)) = mem_store + .write() + .await + .write_data(remote_location.clone(), bytes) { // If write_data returns something, it means the file size is too large // to fit in the memory. We should put it to disk cache. @@ -181,7 +238,12 @@ impl> D break; } } - Some(Err(e)) => return Err(e), + Some(Err(e)) => { + // TODO(lanlou): Every time it returns an error, I need to manually add this clean code... + // How to improve? + self.status_of_keys.write().await.remove(&remote_location); + return Err(e); + } None => break, } } @@ -197,20 +259,26 @@ impl> D // Insertion fails. if replacer_put_status.is_none() { // Put the data to disk cache. - bytes_to_disk = Some(mem_store.clean_data(&remote_location).unwrap().0); + bytes_to_disk = Some( + mem_store + .write() + .await + .clean_data(&remote_location) + .unwrap() + .0, + ); } else { // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { - evicted_bytes_to_disk = Some( - evicted_keys - .iter() - .map(|key| { - let (bytes_vec, data_size) = mem_store.clean_data(key).unwrap(); - (key.clone(), (bytes_vec, data_size)) - }) - .collect(), - ); + let mut evicted_bytes_to_disk_inner = Vec::new(); + for evicted_key in evicted_keys { + evicted_bytes_to_disk_inner.push(( + evicted_key.clone(), + mem_store.write().await.clean_data(&evicted_key).unwrap(), + )); + } + evicted_bytes_to_disk = Some(evicted_bytes_to_disk_inner); } } } @@ -233,13 +301,27 @@ impl> D ) .is_none() { - self.disk_store.clean_data(&disk_store_key).await?; + if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + warn!( + "Failed to put evicted data ({}) to disk replacer.", + disk_store_key + ); } } } // 3. If the data is successfully written to memory, directly return. if self.mem_store.is_some() && bytes_to_disk.is_none() { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); + *status = Status::Completed; + *size = bytes_mem_written; + notify.notify_waiters(); return Ok(bytes_mem_written); } @@ -248,21 +330,41 @@ impl> D let disk_store_key = self.disk_store.data_store_key(&remote_location); // TODO: Write the data store cache first w/ `incompleted` state and update the state // after finishing writing into the data store. - let data_size = self + let data_size_wrap = self .disk_store .write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream)) - .await?; + .await; + if let Err(e) = data_size_wrap { + self.status_of_keys.write().await.remove(&remote_location); + return Err(e); + } + let data_size = data_size_wrap.unwrap(); let mut disk_replacer = self.disk_replacer.write().await; if disk_replacer .put( - remote_location, + remote_location.clone(), MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), ) .is_none() { - self.disk_store.clean_data(&disk_store_key).await?; - // TODO: do we need to notify the caller this failure? + if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { + // TODO: do we need to notify the caller this failure? + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + self.status_of_keys.write().await.remove(&remote_location); + return Err(ParpulseError::Internal( + "Failed to put data to disk replacer.".to_string(), + )); } + + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); + *status = Status::Completed; + *size = bytes_mem_written; + notify.notify_waiters(); Ok(data_size) } } @@ -428,4 +530,72 @@ mod tests { } assert_eq!(written_data_size, 113629); } + + #[tokio::test] + async fn test_same_request_simultaneously_mem() { + let tmp = tempfile::tempdir().unwrap(); + let disk_base_path = tmp.path().to_owned(); + let cache = MemDiskStoreCache::new( + LruReplacer::new(1024 * 512), + disk_base_path.to_str().unwrap().to_string(), + Some(LruReplacer::new(120000)), + Some(120000), + ); + let bucket = "tests-parquet".to_string(); + let keys = vec!["userdata1.parquet".to_string()]; + let remote_location = bucket.clone() + &keys[0]; + let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; + let reader2 = MockS3Reader::new(bucket.clone(), keys).await; + + let put_data_fut_1 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader.into_stream().await.unwrap(), + ); + let put_data_fut_2 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader2.into_stream().await.unwrap(), + ); + + let res = join!(put_data_fut_1, put_data_fut_2); + assert!(res.0.is_ok()); + assert!(res.1.is_ok()); + assert!(res.0.unwrap() == 113629); + assert!(res.1.unwrap() == 113629); + } + + #[tokio::test] + async fn test_same_request_simultaneously_disk() { + let tmp = tempfile::tempdir().unwrap(); + let disk_base_path = tmp.path().to_owned(); + let cache = MemDiskStoreCache::new( + LruReplacer::new(1024 * 512), + disk_base_path.to_str().unwrap().to_string(), + Some(LruReplacer::new(20)), + Some(20), + ); + let bucket = "tests-parquet".to_string(); + let keys = vec!["userdata2.parquet".to_string()]; + let remote_location = bucket.clone() + &keys[0]; + let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; + let reader2 = MockS3Reader::new(bucket.clone(), keys).await; + + let put_data_fut_1 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader.into_stream().await.unwrap(), + ); + let put_data_fut_2 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader2.into_stream().await.unwrap(), + ); + + let res = join!(put_data_fut_1, put_data_fut_2); + assert!(res.0.is_ok()); + assert!(res.1.is_ok()); + assert!(res.0.unwrap() == 112193); + assert!(res.1.unwrap() == 112193); + } } diff --git a/storage-node/src/cache/data_store_cache/mod.rs b/storage-node/src/cache/data_store_cache/mod.rs index 7c83252..8522243 100644 --- a/storage-node/src/cache/data_store_cache/mod.rs +++ b/storage-node/src/cache/data_store_cache/mod.rs @@ -9,7 +9,7 @@ pub mod memdisk; #[async_trait] pub trait DataStoreCache { async fn get_data_from_cache( - &mut self, + &self, remote_location: String, ) -> ParpulseResult>>>; @@ -18,7 +18,7 @@ pub trait DataStoreCache { /// If the data_size is not provided, the cache implementation should try to determine the size of /// the data. async fn put_data_to_cache( - &mut self, + &self, remote_location: String, data_size: Option, data_stream: StorageReaderStream, diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index d01b1ab..3c9dd57 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -21,7 +21,7 @@ pub async fn storage_node_serve() -> ParpulseResult<()> { let data_store_cache = MemDiskStoreCache::new(cache, CACHE_BASE_PATH.to_string(), None, None); let is_mem_disk_cache = true; // TODO: try to use more fine-grained lock instead of locking the whole storage_manager - let storage_manager = Arc::new(Mutex::new(StorageManager::new(data_store_cache))); + let storage_manager = Arc::new(StorageManager::new(data_store_cache)); let route = warp::path!("file") .and(warp::path::end()) @@ -47,11 +47,7 @@ pub async fn storage_node_serve() -> ParpulseResult<()> { } else { RequestParams::S3((bucket, vec![keys])) }; - let result = storage_manager - .lock() - .await - .get_data(request, is_mem_disk_cache) - .await; + let result = storage_manager.get_data(request, is_mem_disk_cache).await; let data_rx = result.unwrap(); let stream = ReceiverStream::new(data_rx); diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 4888963..fbd2262 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -25,7 +25,7 @@ impl StorageManager { } pub async fn get_data( - &mut self, + &self, request: RequestParams, is_mem_disk_cache: bool, ) -> ParpulseResult>> { From 4e8df7efb5dbd30500bcef700df00f76585c891a Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 18:52:49 -0400 Subject: [PATCH 02/23] fix clippy --- storage-node/Cargo.toml | 1 - .../memdisk/data_store/memory.rs | 3 +-- .../src/cache/data_store_cache/memdisk/mod.rs | 26 ++++++++++--------- storage-node/src/server.rs | 1 - storage-node/src/storage_manager.rs | 6 ++--- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/storage-node/Cargo.toml b/storage-node/Cargo.toml index f94c2fd..2a33f01 100644 --- a/storage-node/Cargo.toml +++ b/storage-node/Cargo.toml @@ -24,4 +24,3 @@ rand = "0.8" tokio-stream = "0.1" log = "0.4" env_logger = "0.11" -dashmap = "5.5.3" diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs index 056b9d9..21f492c 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs @@ -1,7 +1,6 @@ -use std::{collections::HashMap, hash::Hash}; +use std::collections::HashMap; use bytes::Bytes; -use dashmap::DashMap; use tokio::sync::mpsc::Receiver; use crate::error::ParpulseResult; diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 108398e..9ff5916 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -1,6 +1,5 @@ pub mod data_store; -use futures::{future::TryFutureExt, join}; use std::{collections::HashMap, sync::Arc}; use futures::stream::StreamExt; @@ -31,6 +30,8 @@ pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; /// [`MemDiskStoreReplacerKey`] is a path to the remote object store. pub type MemDiskStoreReplacerKey = String; +type StatusKeyHashMap = HashMap)>; + pub struct MemDiskStoreReplacerValue { /// The path to the data store. For mem store, it should be data's s3 path. For disk /// store, it should be cached data's disk path. @@ -78,7 +79,7 @@ pub struct MemDiskStoreCache< // MemDiskStoreReplacerKey -> remote_location // status: 0 -> incompleted; 1 -> completed; 2 -> failed // TODO(lanlou): we should clean this hashmap. - status_of_keys: RwLock)>>, + status_of_keys: RwLock, } /// This method creates a `MemDiskStoreCache` instance, and memory can be disabled by setting @@ -179,9 +180,9 @@ impl> D // If in_progress of remote_location thread fails, it will clean the data from this hash map. loop { - let mut status = Status::Incompleted; - let mut size: usize = 0; - let mut notify = Arc::new(Notify::new()); + let status; + let size; + let notify; if let Some(((status_ref, size_ref), notify_ref)) = self.status_of_keys.read().await.get(&remote_location) { @@ -375,6 +376,7 @@ mod tests { cache::replacer::lru::LruReplacer, storage_reader::{s3_diskmock::MockS3Reader, AsyncStorageReader}, }; + use futures::join; use super::*; #[tokio::test] @@ -388,7 +390,7 @@ mod tests { // the future? There is no chance for data1 to be put back to memory again currently. let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); - let mut cache = MemDiskStoreCache::new( + let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(950)), @@ -434,7 +436,7 @@ mod tests { async fn test_put_get_disk_only() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); - let mut cache = MemDiskStoreCache::new( + let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), None, @@ -488,7 +490,7 @@ mod tests { // 4. get the large file let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); - let mut cache = MemDiskStoreCache::new( + let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(950)), @@ -561,8 +563,8 @@ mod tests { let res = join!(put_data_fut_1, put_data_fut_2); assert!(res.0.is_ok()); assert!(res.1.is_ok()); - assert!(res.0.unwrap() == 113629); - assert!(res.1.unwrap() == 113629); + assert_eq!(res.0.unwrap(), 113629); + assert_eq!(res.1.unwrap(), 113629); } #[tokio::test] @@ -595,7 +597,7 @@ mod tests { let res = join!(put_data_fut_1, put_data_fut_2); assert!(res.0.is_ok()); assert!(res.1.is_ok()); - assert!(res.0.unwrap() == 112193); - assert!(res.1.unwrap() == 112193); + assert_eq!(res.0.unwrap(), 112193); + assert_eq!(res.1.unwrap(), 112193); } } diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index 3c9dd57..2fe0c3e 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -1,4 +1,3 @@ -use futures::lock::Mutex; use log::{info, warn}; use std::sync::Arc; use storage_common::{RequestParams, S3Request}; diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index fbd2262..72d51b5 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -120,7 +120,7 @@ mod tests { let data_store_cache = MemDiskStoreCache::new(cache, cache_base_path.display().to_string(), None, None); - let mut storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(data_store_cache); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; @@ -176,7 +176,7 @@ mod tests { Some(mem_cache), Some(950), ); - let mut storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(data_store_cache); let request_path_small_bucket = "tests-text".to_string(); let request_path_small_keys = vec!["what-can-i-hold-you-with".to_string()]; @@ -235,7 +235,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let mut storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(data_store_cache); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; From 1ea4f53a30289ba6340f70caf61eae415cb03ae3 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 19:00:23 -0400 Subject: [PATCH 03/23] improve details --- .../memdisk/data_store/disk.rs | 1 - .../src/cache/data_store_cache/memdisk/mod.rs | 32 +++++++++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index 7beefbf..2688377 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -77,7 +77,6 @@ impl DiskStore { bytes_vec: Option>, stream: Option, ) -> ParpulseResult { - println!("write_data: key={}", key); // NOTE(Yuanxin): Shall we spawn a task to write the data to disk? let bytes_written = self .disk_manager diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 9ff5916..b798047 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -175,14 +175,10 @@ impl> D mut data_stream: StorageReaderStream, ) -> ParpulseResult { { - let mut existed = false; - // If in_progress of remote_location thread fails, it will clean the data from this hash map. - + let mut existed = false; loop { - let status; - let size; - let notify; + let (status, size, notify); if let Some(((status_ref, size_ref), notify_ref)) = self.status_of_keys.read().await.get(&remote_location) { @@ -547,7 +543,8 @@ mod tests { let keys = vec!["userdata1.parquet".to_string()]; let remote_location = bucket.clone() + &keys[0]; let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; - let reader2 = MockS3Reader::new(bucket.clone(), keys).await; + let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await; + let reader3 = MockS3Reader::new(bucket.clone(), keys).await; let put_data_fut_1 = cache.put_data_to_cache( remote_location.clone(), @@ -559,12 +556,19 @@ mod tests { None, reader2.into_stream().await.unwrap(), ); + let put_data_fut_3 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader3.into_stream().await.unwrap(), + ); - let res = join!(put_data_fut_1, put_data_fut_2); + let res = join!(put_data_fut_1, put_data_fut_2, put_data_fut_3); assert!(res.0.is_ok()); assert!(res.1.is_ok()); + assert!(res.2.is_ok()); assert_eq!(res.0.unwrap(), 113629); assert_eq!(res.1.unwrap(), 113629); + assert_eq!(res.2.unwrap(), 113629); } #[tokio::test] @@ -581,7 +585,8 @@ mod tests { let keys = vec!["userdata2.parquet".to_string()]; let remote_location = bucket.clone() + &keys[0]; let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; - let reader2 = MockS3Reader::new(bucket.clone(), keys).await; + let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await; + let reader3 = MockS3Reader::new(bucket.clone(), keys).await; let put_data_fut_1 = cache.put_data_to_cache( remote_location.clone(), @@ -593,11 +598,18 @@ mod tests { None, reader2.into_stream().await.unwrap(), ); + let put_data_fut_3 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader3.into_stream().await.unwrap(), + ); - let res = join!(put_data_fut_1, put_data_fut_2); + let res = join!(put_data_fut_1, put_data_fut_2, put_data_fut_3); assert!(res.0.is_ok()); assert!(res.1.is_ok()); + assert!(res.2.is_ok()); assert_eq!(res.0.unwrap(), 112193); assert_eq!(res.1.unwrap(), 112193); + assert_eq!(res.2.unwrap(), 112193); } } From 0ac4a699e7b339094819849e07a4c26d89b84a38 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 19:22:07 -0400 Subject: [PATCH 04/23] add tests in storage_manager --- storage-node/src/storage_manager.rs | 65 +++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 72d51b5..4ac2447 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -90,6 +90,7 @@ pub trait ParpulseReaderIterator: Iterator> { #[cfg(test)] mod tests { + use futures::join; use std::time::Instant; use crate::cache::{data_store_cache::memdisk::MemDiskStoreCache, replacer::lru::LruReplacer}; @@ -272,4 +273,68 @@ mod tests { ); assert!(delta_time_hit_disk > delta_time_hit_mem); } + + #[tokio::test] + async fn test_same_request_simultaneously_mem() { + let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + let storage_manager = StorageManager::new(data_store_cache); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata1.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_3 = storage_manager.get_data(request_data1.clone(), true); + let result = join!(get_data_fut_1, get_data_fut_2, get_data_fut_3); + assert!(result.0.is_ok()); + assert_eq!(consume_receiver(result.0.unwrap()).await, 113629); + assert!(result.1.is_ok()); + assert_eq!(consume_receiver(result.1.unwrap()).await, 113629); + assert!(result.2.is_ok()); + assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + } + + #[tokio::test] + async fn test_same_request_simultaneously_disk() { + let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(10); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + let storage_manager = StorageManager::new(data_store_cache); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_3 = storage_manager.get_data(request_data1.clone(), true); + let result = join!(get_data_fut_1, get_data_fut_2, get_data_fut_3); + assert!(result.0.is_ok()); + assert_eq!(consume_receiver(result.0.unwrap()).await, 112193); + assert!(result.1.is_ok()); + assert_eq!(consume_receiver(result.1.unwrap()).await, 112193); + assert!(result.2.is_ok()); + assert_eq!(consume_receiver(result.2.unwrap()).await, 112193); + } } From dafaf5e0733c6d8f61e7b4f1ae4d0228f6024cb1 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 19:23:24 -0400 Subject: [PATCH 05/23] refine test name --- storage-node/src/cache/data_store_cache/memdisk/mod.rs | 4 ++-- storage-node/src/storage_manager.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index b798047..286d0dc 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -530,7 +530,7 @@ mod tests { } #[tokio::test] - async fn test_same_request_simultaneously_mem() { + async fn test_same_requests_simultaneously_mem() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); let cache = MemDiskStoreCache::new( @@ -572,7 +572,7 @@ mod tests { } #[tokio::test] - async fn test_same_request_simultaneously_disk() { + async fn test_same_requests_simultaneously_disk() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); let cache = MemDiskStoreCache::new( diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 4ac2447..e9ac9f6 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -275,7 +275,7 @@ mod tests { } #[tokio::test] - async fn test_same_request_simultaneously_mem() { + async fn test_same_requests_simultaneously_mem() { let disk_cache = LruReplacer::new(1000000); let mem_cache = LruReplacer::new(120000); @@ -307,7 +307,7 @@ mod tests { } #[tokio::test] - async fn test_same_request_simultaneously_disk() { + async fn test_same_requests_simultaneously_disk() { let disk_cache = LruReplacer::new(1000000); let mem_cache = LruReplacer::new(10); From e2c51ab2a6a180788c65caa68b37d6758b8aa93f Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 19:31:10 -0400 Subject: [PATCH 06/23] change lock for replacer to mutex --- .../src/cache/data_store_cache/memdisk/mod.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 286d0dc..8c951fc 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use futures::stream::StreamExt; use log::warn; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::{Mutex, Notify, RwLock}; use crate::{ cache::{ @@ -74,8 +74,8 @@ pub struct MemDiskStoreCache< disk_store: DiskStore, mem_store: Option>, /// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size) - disk_replacer: RwLock, - mem_replacer: Option>, + disk_replacer: Mutex, + mem_replacer: Option>, // MemDiskStoreReplacerKey -> remote_location // status: 0 -> incompleted; 1 -> completed; 2 -> failed // TODO(lanlou): we should clean this hashmap. @@ -113,15 +113,15 @@ impl> MemDiskStoreCache { disk_store, mem_store: Some(RwLock::new(mem_store)), - disk_replacer: RwLock::new(disk_replacer), - mem_replacer: Some(RwLock::new(mem_replacer.unwrap())), + disk_replacer: Mutex::new(disk_replacer), + mem_replacer: Some(Mutex::new(mem_replacer.unwrap())), status_of_keys: RwLock::new(HashMap::new()), } } else { MemDiskStoreCache { disk_store, mem_store: None, - disk_replacer: RwLock::new(disk_replacer), + disk_replacer: Mutex::new(disk_replacer), mem_replacer: None, status_of_keys: RwLock::new(HashMap::new()), } @@ -139,7 +139,7 @@ impl> D ) -> ParpulseResult>>> { // TODO: Refine the lock. if let Some(mem_store) = &self.mem_store { - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await; + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; if let Some(replacer_value) = mem_replacer.get(&remote_location) { let data_store_key = replacer_value.as_value(); match mem_store.read().await.read_data(data_store_key) { @@ -154,7 +154,7 @@ impl> D } } - let mut disk_replacer = self.disk_replacer.write().await; + let mut disk_replacer = self.disk_replacer.lock().await; match disk_replacer.get(&remote_location) { Some(replacer_value) => { let data_store_key = replacer_value.as_value(); @@ -248,7 +248,7 @@ impl> D if bytes_to_disk.is_none() { // If bytes_to_disk has nothing, it means the data has been successfully written to memory. // We have to put it into mem_replacer too. - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await; + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; let replacer_put_status = mem_replacer.put( remote_location.clone(), MemDiskStoreReplacerValue::new(remote_location.clone(), bytes_mem_written), @@ -290,7 +290,7 @@ impl> D self.disk_store .write_data(disk_store_key.clone(), Some(bytes_vec), None) .await?; - let mut disk_replacer = self.disk_replacer.write().await; + let mut disk_replacer = self.disk_replacer.lock().await; if disk_replacer .put( remote_location_evicted, @@ -336,7 +336,7 @@ impl> D return Err(e); } let data_size = data_size_wrap.unwrap(); - let mut disk_replacer = self.disk_replacer.write().await; + let mut disk_replacer = self.disk_replacer.lock().await; if disk_replacer .put( remote_location.clone(), From 628990fe40f61458cbb0e570e5174f054213b4b1 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sat, 27 Apr 2024 20:37:34 -0400 Subject: [PATCH 07/23] fix bugs --- storage-node/src/cache/data_store_cache/memdisk/mod.rs | 6 +++--- storage-node/src/storage_manager.rs | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 8c951fc..903979c 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -360,7 +360,7 @@ impl> D let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; - *size = bytes_mem_written; + *size = data_size; notify.notify_waiters(); Ok(data_size) } @@ -578,8 +578,8 @@ mod tests { let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), - Some(LruReplacer::new(20)), - Some(20), + None, + None, ); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata2.parquet".to_string()]; diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index e9ac9f6..5635e53 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -309,7 +309,6 @@ mod tests { #[tokio::test] async fn test_same_requests_simultaneously_disk() { let disk_cache = LruReplacer::new(1000000); - let mem_cache = LruReplacer::new(10); let tmp = tempfile::tempdir().unwrap(); let disk_cache_base_path = tmp.path().to_owned(); @@ -317,8 +316,8 @@ mod tests { let data_store_cache = MemDiskStoreCache::new( disk_cache, disk_cache_base_path.display().to_string(), - Some(mem_cache), - Some(120000), + None, + None, ); let storage_manager = StorageManager::new(data_store_cache); From 86e23e37ded8ad6ab426d6267a6d73898b4677a0 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 01:06:22 -0400 Subject: [PATCH 08/23] feat: add pin/unpin for replacer to pass test_storage_manager_parallel_2 --- .../src/cache/data_store_cache/memdisk/mod.rs | 125 ++++++++++++------ storage-node/src/cache/replacer/lru.rs | 87 ++++++++++-- storage-node/src/cache/replacer/lru_k.rs | 80 +++++++++-- storage-node/src/cache/replacer/mod.rs | 4 + storage-node/src/storage_manager.rs | 62 ++++++--- 5 files changed, 281 insertions(+), 77 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 903979c..84113ca 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -30,7 +30,9 @@ pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; /// [`MemDiskStoreReplacerKey`] is a path to the remote object store. pub type MemDiskStoreReplacerKey = String; -type StatusKeyHashMap = HashMap)>; +// Status -> completed/incompleted; usize -> file_size +// Notify -> notify; usize -> notify_waiter_count +type StatusKeyHashMap = HashMap)>)>; pub struct MemDiskStoreReplacerValue { /// The path to the data store. For mem store, it should be data's s3 path. For disk @@ -143,7 +145,15 @@ impl> D if let Some(replacer_value) = mem_replacer.get(&remote_location) { let data_store_key = replacer_value.as_value(); match mem_store.read().await.read_data(data_store_key) { - Ok(Some(rx)) => return Ok(Some(rx)), + Ok(Some(rx)) => { + if !mem_replacer.unpin(&remote_location) { + warn!( + "Failed to unpin the key ({}) in memory replacer.", + remote_location + ); + } + return Ok(Some(rx)); + } Ok(None) => { return Err(ParpulseError::Internal( "Memory replacer and memory store keys are inconsistent.".to_string(), @@ -159,6 +169,7 @@ impl> D Some(replacer_value) => { let data_store_key = replacer_value.as_value(); if let Some(data) = self.disk_store.read_data(data_store_key).await? { + disk_replacer.unpin(&remote_location); Ok(Some(data)) } else { unreachable!() @@ -191,7 +202,8 @@ impl> D } match status { Status::Incompleted => { - notify.notified().await; + *notify.1.lock().await += 1; + notify.0.notified().await; } Status::Completed => { return Ok(size); @@ -206,7 +218,10 @@ impl> D } self.status_of_keys.write().await.insert( remote_location.clone(), - ((Status::Incompleted, 0), Arc::new(Notify::new())), + ( + (Status::Incompleted, 0), + Arc::new((Notify::new(), Mutex::new(0))), + ), ); } @@ -265,6 +280,8 @@ impl> D .0, ); } else { + // TODO(lanlou): currently the pin/unpin relies on after putting, it will **always** get! + mem_replacer.pin(&remote_location); // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { @@ -318,7 +335,15 @@ impl> D let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; *size = bytes_mem_written; - notify.notify_waiters(); + for _ in 0..*notify.1.lock().await { + self.mem_replacer + .as_ref() + .unwrap() + .lock() + .await + .pin(&remote_location); + } + notify.0.notify_waiters(); return Ok(bytes_mem_written); } @@ -336,32 +361,37 @@ impl> D return Err(e); } let data_size = data_size_wrap.unwrap(); - let mut disk_replacer = self.disk_replacer.lock().await; - if disk_replacer - .put( - remote_location.clone(), - MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), - ) - .is_none() { - if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { - // TODO: do we need to notify the caller this failure? - warn!( - "Failed to clean data ({}) from disk store: {}", - disk_store_key, e - ); + let mut disk_replacer = self.disk_replacer.lock().await; + if disk_replacer + .put( + remote_location.clone(), + MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), + ) + .is_none() + { + if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { + // TODO: do we need to notify the caller this failure? + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + self.status_of_keys.write().await.remove(&remote_location); + return Err(ParpulseError::Internal( + "Failed to put data to disk replacer.".to_string(), + )); } - self.status_of_keys.write().await.remove(&remote_location); - return Err(ParpulseError::Internal( - "Failed to put data to disk replacer.".to_string(), - )); + disk_replacer.pin(&remote_location); } - let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; *size = data_size; - notify.notify_waiters(); + for _ in 0..*notify.1.lock().await { + self.disk_replacer.lock().await.pin(&remote_location); + } + notify.0.notify_waiters(); Ok(data_size) } } @@ -530,17 +560,17 @@ mod tests { } #[tokio::test] - async fn test_same_requests_simultaneously_mem() { + async fn test_same_requests_simultaneously_disk() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), - Some(LruReplacer::new(120000)), - Some(120000), + None, + None, ); let bucket = "tests-parquet".to_string(); - let keys = vec!["userdata1.parquet".to_string()]; + let keys = vec!["userdata2.parquet".to_string()]; let remote_location = bucket.clone() + &keys[0]; let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await; @@ -566,27 +596,30 @@ mod tests { assert!(res.0.is_ok()); assert!(res.1.is_ok()); assert!(res.2.is_ok()); - assert_eq!(res.0.unwrap(), 113629); - assert_eq!(res.1.unwrap(), 113629); - assert_eq!(res.2.unwrap(), 113629); + assert_eq!(res.0.unwrap(), 112193); + assert_eq!(res.1.unwrap(), 112193); + assert_eq!(res.2.unwrap(), 112193); } #[tokio::test] - async fn test_same_requests_simultaneously_disk() { + async fn test_same_requests_simultaneously_mix() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); let cache = MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), - None, - None, + Some(LruReplacer::new(120000)), + Some(120000), ); let bucket = "tests-parquet".to_string(); - let keys = vec!["userdata2.parquet".to_string()]; + let keys = vec!["userdata1.parquet".to_string()]; + let keys2 = vec!["userdata2.parquet".to_string()]; let remote_location = bucket.clone() + &keys[0]; + let remote_location2 = bucket.clone() + &keys2[0]; let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await; let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await; - let reader3 = MockS3Reader::new(bucket.clone(), keys).await; + let reader3 = MockS3Reader::new(bucket.clone(), keys2).await; + let reader4 = MockS3Reader::new(bucket.clone(), keys).await; let put_data_fut_1 = cache.put_data_to_cache( remote_location.clone(), @@ -599,17 +632,29 @@ mod tests { reader2.into_stream().await.unwrap(), ); let put_data_fut_3 = cache.put_data_to_cache( - remote_location.clone(), + remote_location2.clone(), None, reader3.into_stream().await.unwrap(), ); + let put_data_fut_4 = cache.put_data_to_cache( + remote_location.clone(), + None, + reader4.into_stream().await.unwrap(), + ); - let res = join!(put_data_fut_1, put_data_fut_2, put_data_fut_3); + let res = join!( + put_data_fut_1, + put_data_fut_2, + put_data_fut_3, + put_data_fut_4 + ); assert!(res.0.is_ok()); assert!(res.1.is_ok()); assert!(res.2.is_ok()); - assert_eq!(res.0.unwrap(), 112193); - assert_eq!(res.1.unwrap(), 112193); + assert!(res.3.is_ok()); + assert_eq!(res.0.unwrap(), 113629); + assert_eq!(res.1.unwrap(), 113629); assert_eq!(res.2.unwrap(), 112193); + assert_eq!(res.3.unwrap(), 113629); } } diff --git a/storage-node/src/cache/replacer/lru.rs b/storage-node/src/cache/replacer/lru.rs index 03cc4d4..dc6fafd 100644 --- a/storage-node/src/cache/replacer/lru.rs +++ b/storage-node/src/cache/replacer/lru.rs @@ -1,3 +1,5 @@ +use std::ops::Deref; + use hashlink::linked_hash_map; use hashlink::LinkedHashMap; use log::{debug, warn}; @@ -10,7 +12,8 @@ use super::ReplacerValue; /// objects. The replacer will start evicting if a new object comes that makes /// the replacer's size exceeds its max capacity, from the oldest to the newest. pub struct LruReplacer { - cache_map: LinkedHashMap, + // usize is pin count + cache_map: LinkedHashMap, max_capacity: usize, size: usize, } @@ -28,7 +31,7 @@ impl LruReplacer { match self.cache_map.raw_entry_mut().from_key(key) { linked_hash_map::RawEntryMut::Occupied(mut entry) => { entry.to_back(); - Some(entry.into_mut()) + Some(&entry.into_mut().0) } linked_hash_map::RawEntryMut::Vacant(_) => None, } @@ -50,23 +53,61 @@ impl LruReplacer { } if let Some(cache_value) = self.cache_map.get(&key) { // If the key already exists, update the replacer size. - self.size -= cache_value.size(); + self.size -= cache_value.0.size(); } - self.size += value.size(); - self.cache_map.insert(key.clone(), value); let mut evicted_keys = Vec::new(); - while self.size > self.max_capacity { + while (self.size + value.size()) > self.max_capacity { + // Previous code guarantees that there is at least 1 element in the cache_map, so we + // can safely unwrap here. + println!( + "-------- To Evicting Key: {:?} --------", + self.cache_map.front().unwrap().1 .1 + ); + if self.cache_map.front().unwrap().1 .1 > 0 { + // TODO(lanlou): Actually we should look next to evict, but for simplicity, just + // return None here, it is temporarily okay since we will not pin an element for + // long time now. + return None; + } if let Some((key, cache_value)) = self.cache_map.pop_front() { debug!("-------- Evicting Key: {:?} --------", key); evicted_keys.push(key); - self.size -= cache_value.size(); + self.size -= cache_value.0.size(); } } + self.size += value.size(); + self.cache_map.insert(key.clone(), (value, 0)); Some(evicted_keys) } + fn pin_key(&mut self, key: &K) -> bool { + match self.cache_map.get_mut(key) { + Some((_, pin_count)) => { + *pin_count += 1; + true + } + None => false, + } + } + + fn unpin_key(&mut self, key: &K) -> bool { + match self.cache_map.get_mut(key) { + Some((_, pin_count)) => { + if *pin_count.deref() == 0 { + return false; + } + *pin_count -= 1; + true + } + None => false, + } + } + fn peek_value(&self, key: &K) -> Option<&V> { - self.cache_map.get(key) + match self.cache_map.get(key) { + Some((value, _)) => Some(value), + None => None, + } } } @@ -79,6 +120,14 @@ impl DataStoreReplacer for LruReplacer bool { + self.pin_key(key) + } + + fn unpin(&mut self, key: &K) -> bool { + self.unpin_key(key) + } + fn peek(&self, key: &K) -> Option<&V> { self.peek_value(key) } @@ -192,4 +241,26 @@ mod tests { ); assert_eq!(replacer.get(&("key2".to_string())), None); } + + #[test] + fn test_evict_pinned_key() { + let mut replacer = + LruReplacer::::new(10); + replacer.put("key1".to_string(), ("value1".to_string(), 9)); + assert!(replacer.pin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_none()); + assert_eq!(replacer.size(), 9); + assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.unpin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_none()); + assert!(replacer.unpin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_some()); + assert_eq!(replacer.size(), 2); + } } diff --git a/storage-node/src/cache/replacer/lru_k.rs b/storage-node/src/cache/replacer/lru_k.rs index ebd66c4..174a555 100644 --- a/storage-node/src/cache/replacer/lru_k.rs +++ b/storage-node/src/cache/replacer/lru_k.rs @@ -18,6 +18,7 @@ type Timestamp = i32; struct LruKNode { value: V, history: VecDeque, + pin_count: usize, } /// Represents an LRU-K replacer. @@ -65,8 +66,9 @@ impl LruKReplacer { } else { self.curr_timestamp - kth_timestamp }; - if (k_dist > max_k_dist) - || (k_dist == max_k_dist && kth_timestamp < &earliest_timestamp) + if ((k_dist > max_k_dist) + || (k_dist == max_k_dist && kth_timestamp < &earliest_timestamp)) + && node.pin_count == 0 { found = true; max_k_dist = k_dist; @@ -129,27 +131,49 @@ impl LruKReplacer { new_history.push_back(self.curr_timestamp); self.curr_timestamp += 1; } + let mut evicted_keys = Vec::new(); + while (self.size + updated_size) > self.max_capacity { + let key_to_evict = self.evict(&key); + if key_to_evict.is_none() { + return None; + } + if let Some(evicted_key) = key_to_evict { + evicted_keys.push(evicted_key); + } + } self.cache_map.insert( key.clone(), LruKNode { value, history: new_history, + pin_count: 0, }, ); self.size += updated_size; - let mut evicted_keys = Vec::new(); - while self.size > self.max_capacity { - let key_to_evict = self.evict(&key); - debug_assert!( - key_to_evict.is_some(), - "key {:?} should have been evicted when replacer size is greater than max capacity", - key - ); - if let Some(evicted_key) = key_to_evict { - evicted_keys.push(evicted_key); + Some(evicted_keys) + } + + fn pin_value(&mut self, key: &K) -> bool { + match self.cache_map.get_mut(key) { + Some(node) => { + node.pin_count += 1; + true } + None => false, + } + } + + fn unpin_value(&mut self, key: &K) -> bool { + match self.cache_map.get_mut(key) { + Some(node) => { + if node.pin_count == 0 { + return false; + } + node.pin_count -= 1; + true + } + None => false, } - Some(evicted_keys) } fn peek_value(&self, key: &K) -> Option<&V> { @@ -176,6 +200,14 @@ impl DataStoreReplacer for LruKReplacer< self.put_value(key, value) } + fn pin(&mut self, key: &K) -> bool { + self.pin_value(key) + } + + fn unpin(&mut self, key: &K) -> bool { + self.unpin_value(key) + } + fn peek(&self, key: &K) -> Option<&V> { self.peek_value(key) } @@ -319,4 +351,26 @@ mod tests { ); assert_eq!(replacer.get(&("key2".to_string())), None); } + + #[test] + fn test_evict_pinned_key() { + let mut replacer = + LruKReplacer::::new(10, 2); + replacer.put("key1".to_string(), ("value1".to_string(), 9)); + assert!(replacer.pin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_none()); + assert_eq!(replacer.size(), 9); + assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.unpin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_none()); + assert!(replacer.unpin(&"key1".to_string())); + assert!(replacer + .put("key2".to_string(), ("value2".to_string(), 2)) + .is_some()); + assert_eq!(replacer.size(), 2); + } } diff --git a/storage-node/src/cache/replacer/mod.rs b/storage-node/src/cache/replacer/mod.rs index c09a064..61c6636 100644 --- a/storage-node/src/cache/replacer/mod.rs +++ b/storage-node/src/cache/replacer/mod.rs @@ -37,6 +37,10 @@ pub trait DataStoreReplacer: Send + Sync { /// Returns `Some`: insertion successful with a list of keys that are evicted from the cache. fn put(&mut self, key: K, value: V) -> Option>; + fn pin(&mut self, key: &K) -> bool; + + fn unpin(&mut self, key: &K) -> bool; + /// Returns a reference to the value in the replacer with no side effect on the /// replacer. fn peek(&self, key: &K) -> Option<&V>; diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 5635e53..d1c0571 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -74,9 +74,11 @@ impl StorageManager { let data_rx = self .data_store_cache .get_data_from_cache(cache_key.clone()) - .await? - .unwrap(); - Ok(data_rx) + .await?; + if data_rx.is_none() { + panic!("Data should be in the cache now. {}", cache_key.clone()); + } + Ok(data_rx.unwrap()) } } } @@ -275,9 +277,8 @@ mod tests { } #[tokio::test] - async fn test_same_requests_simultaneously_mem() { + async fn test_storage_manager_parallel_1() { let disk_cache = LruReplacer::new(1000000); - let mem_cache = LruReplacer::new(120000); let tmp = tempfile::tempdir().unwrap(); let disk_cache_base_path = tmp.path().to_owned(); @@ -285,8 +286,8 @@ mod tests { let data_store_cache = MemDiskStoreCache::new( disk_cache, disk_cache_base_path.display().to_string(), - Some(mem_cache), - Some(120000), + None, + None, ); let storage_manager = StorageManager::new(data_store_cache); @@ -294,21 +295,34 @@ mod tests { let request_path_keys1 = vec!["userdata1.parquet".to_string()]; let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata2.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_3 = storage_manager.get_data(request_data1.clone(), true); - let result = join!(get_data_fut_1, get_data_fut_2, get_data_fut_3); + let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); + let get_data_fut_4 = storage_manager.get_data(request_data1.clone(), true); + let result = join!( + get_data_fut_1, + get_data_fut_2, + get_data_fut_3, + get_data_fut_4 + ); assert!(result.0.is_ok()); assert_eq!(consume_receiver(result.0.unwrap()).await, 113629); assert!(result.1.is_ok()); assert_eq!(consume_receiver(result.1.unwrap()).await, 113629); assert!(result.2.is_ok()); - assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.2.unwrap()).await, 112193); + assert!(result.3.is_ok()); + assert_eq!(consume_receiver(result.3.unwrap()).await, 113629); } #[tokio::test] - async fn test_same_requests_simultaneously_disk() { + async fn test_storage_manager_parallel_2() { let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(120000); let tmp = tempfile::tempdir().unwrap(); let disk_cache_base_path = tmp.path().to_owned(); @@ -316,8 +330,8 @@ mod tests { let data_store_cache = MemDiskStoreCache::new( disk_cache, disk_cache_base_path.display().to_string(), - None, - None, + Some(mem_cache), + Some(120000), ); let storage_manager = StorageManager::new(data_store_cache); @@ -325,15 +339,31 @@ mod tests { let request_path_keys1 = vec!["userdata2.parquet".to_string()]; let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_3 = storage_manager.get_data(request_data1.clone(), true); - let result = join!(get_data_fut_1, get_data_fut_2, get_data_fut_3); + let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); + let get_data_fut_4 = storage_manager.get_data(request_data2.clone(), true); + let get_data_fut_5 = storage_manager.get_data(request_data1.clone(), true); + let result = join!( + get_data_fut_1, + get_data_fut_2, + get_data_fut_3, + get_data_fut_4, + get_data_fut_5 + ); assert!(result.0.is_ok()); assert_eq!(consume_receiver(result.0.unwrap()).await, 112193); assert!(result.1.is_ok()); assert_eq!(consume_receiver(result.1.unwrap()).await, 112193); assert!(result.2.is_ok()); - assert_eq!(consume_receiver(result.2.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + assert!(result.3.is_ok()); + assert_eq!(consume_receiver(result.3.unwrap()).await, 113629); + assert!(result.4.is_ok()); + assert_eq!(consume_receiver(result.4.unwrap()).await, 112193); } } From 9c19d66ec08eb7c049207c97f8c561a34ad8dd20 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 01:44:33 -0400 Subject: [PATCH 09/23] fix bugs --- storage-node/src/cache/data_store_cache/memdisk/mod.rs | 8 ++++++-- storage-node/src/cache/replacer/lru.rs | 4 ---- storage-node/src/cache/replacer/lru_k.rs | 5 ++--- storage-node/src/disk/disk_manager.rs | 1 + 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 84113ca..6b2d13e 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -146,6 +146,7 @@ impl> D let data_store_key = replacer_value.as_value(); match mem_store.read().await.read_data(data_store_key) { Ok(Some(rx)) => { + // TODO(lanlou): actually we should unpin after all the data are consumed... if !mem_replacer.unpin(&remote_location) { warn!( "Failed to unpin the key ({}) in memory replacer.", @@ -388,8 +389,11 @@ impl> D let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; *size = data_size; - for _ in 0..*notify.1.lock().await { - self.disk_replacer.lock().await.pin(&remote_location); + { + let mut disk_replacer = self.disk_replacer.lock().await; + for _ in 0..*notify.1.lock().await { + disk_replacer.pin(&remote_location); + } } notify.0.notify_waiters(); Ok(data_size) diff --git a/storage-node/src/cache/replacer/lru.rs b/storage-node/src/cache/replacer/lru.rs index dc6fafd..6ef4ff0 100644 --- a/storage-node/src/cache/replacer/lru.rs +++ b/storage-node/src/cache/replacer/lru.rs @@ -59,10 +59,6 @@ impl LruReplacer { while (self.size + value.size()) > self.max_capacity { // Previous code guarantees that there is at least 1 element in the cache_map, so we // can safely unwrap here. - println!( - "-------- To Evicting Key: {:?} --------", - self.cache_map.front().unwrap().1 .1 - ); if self.cache_map.front().unwrap().1 .1 > 0 { // TODO(lanlou): Actually we should look next to evict, but for simplicity, just // return None here, it is temporarily okay since we will not pin an element for diff --git a/storage-node/src/cache/replacer/lru_k.rs b/storage-node/src/cache/replacer/lru_k.rs index 174a555..640f4c8 100644 --- a/storage-node/src/cache/replacer/lru_k.rs +++ b/storage-node/src/cache/replacer/lru_k.rs @@ -134,9 +134,8 @@ impl LruKReplacer { let mut evicted_keys = Vec::new(); while (self.size + updated_size) > self.max_capacity { let key_to_evict = self.evict(&key); - if key_to_evict.is_none() { - return None; - } + // If key_to_evict is none, return none + key_to_evict.as_ref()?; if let Some(evicted_key) = key_to_evict { evicted_keys.push(evicted_key); } diff --git a/storage-node/src/disk/disk_manager.rs b/storage-node/src/disk/disk_manager.rs index ce4d596..355faa0 100644 --- a/storage-node/src/disk/disk_manager.rs +++ b/storage-node/src/disk/disk_manager.rs @@ -120,6 +120,7 @@ impl DiskManager { if let Some(mut stream) = stream { let bytes_cur = stream.next().await; if bytes_cur.is_none() { + file.flush().await?; return Ok(bytes_written); } let mut bytes_cur = bytes_cur.unwrap()?; From 5331f0792742fe7cdb0287025e906b293691b497 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 10:41:32 -0400 Subject: [PATCH 10/23] improve pin function --- .../src/cache/data_store_cache/memdisk/mod.rs | 25 ++++++++----------- storage-node/src/cache/replacer/lru.rs | 12 ++++----- storage-node/src/cache/replacer/lru_k.rs | 12 ++++----- storage-node/src/cache/replacer/mod.rs | 2 +- 4 files changed, 23 insertions(+), 28 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 6b2d13e..210a8fa 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -282,7 +282,7 @@ impl> D ); } else { // TODO(lanlou): currently the pin/unpin relies on after putting, it will **always** get! - mem_replacer.pin(&remote_location); + mem_replacer.pin(&remote_location, 1); // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { @@ -336,13 +336,9 @@ impl> D let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; *size = bytes_mem_written; - for _ in 0..*notify.1.lock().await { - self.mem_replacer - .as_ref() - .unwrap() - .lock() - .await - .pin(&remote_location); + { + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; + mem_replacer.pin(&remote_location, *notify.1.lock().await); } notify.0.notify_waiters(); return Ok(bytes_mem_written); @@ -383,18 +379,17 @@ impl> D "Failed to put data to disk replacer.".to_string(), )); } - disk_replacer.pin(&remote_location); + disk_replacer.pin(&remote_location, 1); } let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); *status = Status::Completed; *size = data_size; - { - let mut disk_replacer = self.disk_replacer.lock().await; - for _ in 0..*notify.1.lock().await { - disk_replacer.pin(&remote_location); - } - } + self.disk_replacer + .lock() + .await + .pin(&remote_location, *notify.1.lock().await); + // FIXME: disk_replacer lock should be released here notify.0.notify_waiters(); Ok(data_size) } diff --git a/storage-node/src/cache/replacer/lru.rs b/storage-node/src/cache/replacer/lru.rs index 6ef4ff0..d5e58c1 100644 --- a/storage-node/src/cache/replacer/lru.rs +++ b/storage-node/src/cache/replacer/lru.rs @@ -76,10 +76,10 @@ impl LruReplacer { Some(evicted_keys) } - fn pin_key(&mut self, key: &K) -> bool { + fn pin_key(&mut self, key: &K, count: usize) -> bool { match self.cache_map.get_mut(key) { Some((_, pin_count)) => { - *pin_count += 1; + *pin_count += count; true } None => false, @@ -116,8 +116,8 @@ impl DataStoreReplacer for LruReplacer bool { - self.pin_key(key) + fn pin(&mut self, key: &K, count: usize) -> bool { + self.pin_key(key, count) } fn unpin(&mut self, key: &K) -> bool { @@ -243,12 +243,12 @@ mod tests { let mut replacer = LruReplacer::::new(10); replacer.put("key1".to_string(), ("value1".to_string(), 9)); - assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.pin(&"key1".to_string(), 1)); assert!(replacer .put("key2".to_string(), ("value2".to_string(), 2)) .is_none()); assert_eq!(replacer.size(), 9); - assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.pin(&"key1".to_string(), 1)); assert!(replacer.unpin(&"key1".to_string())); assert!(replacer .put("key2".to_string(), ("value2".to_string(), 2)) diff --git a/storage-node/src/cache/replacer/lru_k.rs b/storage-node/src/cache/replacer/lru_k.rs index 640f4c8..610a966 100644 --- a/storage-node/src/cache/replacer/lru_k.rs +++ b/storage-node/src/cache/replacer/lru_k.rs @@ -152,10 +152,10 @@ impl LruKReplacer { Some(evicted_keys) } - fn pin_value(&mut self, key: &K) -> bool { + fn pin_value(&mut self, key: &K, count: usize) -> bool { match self.cache_map.get_mut(key) { Some(node) => { - node.pin_count += 1; + node.pin_count += count; true } None => false, @@ -199,8 +199,8 @@ impl DataStoreReplacer for LruKReplacer< self.put_value(key, value) } - fn pin(&mut self, key: &K) -> bool { - self.pin_value(key) + fn pin(&mut self, key: &K, count: usize) -> bool { + self.pin_value(key, count) } fn unpin(&mut self, key: &K) -> bool { @@ -356,12 +356,12 @@ mod tests { let mut replacer = LruKReplacer::::new(10, 2); replacer.put("key1".to_string(), ("value1".to_string(), 9)); - assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.pin(&"key1".to_string(), 1)); assert!(replacer .put("key2".to_string(), ("value2".to_string(), 2)) .is_none()); assert_eq!(replacer.size(), 9); - assert!(replacer.pin(&"key1".to_string())); + assert!(replacer.pin(&"key1".to_string(), 1)); assert!(replacer.unpin(&"key1".to_string())); assert!(replacer .put("key2".to_string(), ("value2".to_string(), 2)) diff --git a/storage-node/src/cache/replacer/mod.rs b/storage-node/src/cache/replacer/mod.rs index 61c6636..bdf9176 100644 --- a/storage-node/src/cache/replacer/mod.rs +++ b/storage-node/src/cache/replacer/mod.rs @@ -37,7 +37,7 @@ pub trait DataStoreReplacer: Send + Sync { /// Returns `Some`: insertion successful with a list of keys that are evicted from the cache. fn put(&mut self, key: K, value: V) -> Option>; - fn pin(&mut self, key: &K) -> bool; + fn pin(&mut self, key: &K, count: usize) -> bool; fn unpin(&mut self, key: &K) -> bool; From 31015f434d4e0ea69c1ab8a47505402eabbece07 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 12:19:55 -0400 Subject: [PATCH 11/23] fix: unpin after all the data is consumed & fix tests for server --- .../memdisk/data_store/disk.rs | 29 ++++++--- .../memdisk/data_store/memory.rs | 32 ++++++++-- .../src/cache/data_store_cache/memdisk/mod.rs | 62 +++++++++++-------- storage-node/src/cache/replacer/lru.rs | 4 +- storage-node/src/server.rs | 19 ++---- storage-node/src/storage_manager.rs | 37 +++++++++++ 6 files changed, 126 insertions(+), 57 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index 4006152..f1e3955 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -1,11 +1,17 @@ -use std::fs; +use std::{fs, sync::Arc}; use bytes::Bytes; use futures::StreamExt; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc::Receiver, Mutex}; use crate::{ - disk::disk_manager::DiskManager, error::ParpulseResult, storage_reader::StorageReaderStream, + cache::{ + data_store_cache::memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue}, + replacer::DataStoreReplacer, + }, + disk::disk_manager::DiskManager, + error::ParpulseResult, + storage_reader::StorageReaderStream, }; /// TODO(lanlou): make them configurable. @@ -22,7 +28,10 @@ pub struct DiskStore { impl Drop for DiskStore { fn drop(&mut self) { - fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed"); + if fs::metadata(&self.base_path).is_ok() { + println!("Removing cache files: {:?}", self.base_path); + fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed"); + } } } @@ -32,7 +41,6 @@ impl DiskStore { if !final_base_path.ends_with('/') { final_base_path += "/"; } - Self { disk_manager, base_path: final_base_path, @@ -43,10 +51,14 @@ impl DiskStore { impl DiskStore { /// Reads data from the disk store. The method returns a stream of data read from the disk /// store. - pub async fn read_data( + pub async fn read_data( &self, key: &str, - ) -> ParpulseResult>>> { + disk_replacer: Arc>, + ) -> ParpulseResult>>> + where + R: DataStoreReplacer + 'static, + { // TODO(lanlou): we later may consider the remaining space to decide the buffer size let mut buffer_size = self.disk_manager.file_size(key).await? as usize; if buffer_size > MAX_DISK_READER_BUFFER_SIZE { @@ -55,6 +67,7 @@ impl DiskStore { // FIXME: Shall we consider the situation where the data is not found? let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?; let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE); + let key_str = key.to_string().clone(); tokio::spawn(async move { loop { match disk_stream.next().await { @@ -67,6 +80,8 @@ impl DiskStore { None => break, } } + // TODO(lanlou): when second read, so there is no need to unpin, how to improve? + disk_replacer.lock().await.unpin(&key_str); }); Ok(Some(rx)) } diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs index 21f492c..25c292e 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/memory.rs @@ -1,9 +1,15 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc::Receiver, Mutex}; -use crate::error::ParpulseResult; +use crate::{ + cache::{ + data_store_cache::memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue}, + replacer::DataStoreReplacer, + }, + error::ParpulseResult, +}; const DEFAULT_MEM_CHANNEL_BUFFER_SIZE: usize = 1024; @@ -21,17 +27,27 @@ impl MemStore { } } - pub fn read_data(&self, key: &str) -> ParpulseResult>>> { + pub fn read_data( + &self, + key: &str, + mem_replacer: Arc>, + ) -> ParpulseResult>>> + where + R: DataStoreReplacer + 'static, + { let key_value = self.data.get(key); if key_value.is_none() { return Ok(None); } let data_vec = key_value.unwrap().0.clone(); let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_MEM_CHANNEL_BUFFER_SIZE); + let key_str = key.to_string().clone(); tokio::spawn(async move { for data in data_vec.iter() { tx.send(Ok(data.clone())).await.unwrap(); } + // TODO(lanlou): when second read, so there is no need to unpin, how to improve? + mem_replacer.lock().await.unpin(&key_str); }); Ok(Some(rx)) } @@ -61,6 +77,8 @@ impl MemStore { #[cfg(test)] mod tests { + use crate::cache::replacer::lru::LruReplacer; + use super::*; #[test] @@ -90,7 +108,8 @@ mod tests { assert_eq!(res3.as_ref().unwrap().0[1], bytes2_cp); assert_eq!(res3.as_ref().unwrap().0[2], bytes3_cp); - let read_res = mem_store.read_data(key.as_str()); + let dummy_replacer = Arc::new(Mutex::new(LruReplacer::new(0))); + let read_res = mem_store.read_data(key.as_str(), dummy_replacer); assert!(read_res.is_ok()); assert!(read_res.unwrap().is_none()); } @@ -104,7 +123,8 @@ mod tests { let bytes_cp = bytes.clone(); let res = mem_store.write_data(key.clone(), bytes); assert!(res.is_none()); - let read_res = mem_store.read_data(key.as_str()); + let dummy_replacer = Arc::new(Mutex::new(LruReplacer::new(0))); + let read_res = mem_store.read_data(key.as_str(), dummy_replacer); assert!(read_res.is_ok()); let mut rx = read_res.unwrap().unwrap(); let mut bytes_vec = Vec::new(); diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 210a8fa..9fa38fb 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -71,13 +71,13 @@ enum Status { } pub struct MemDiskStoreCache< - R: DataStoreReplacer, + R: 'static + DataStoreReplacer, > { disk_store: DiskStore, mem_store: Option>, /// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size) - disk_replacer: Mutex, - mem_replacer: Option>, + disk_replacer: Arc>, + mem_replacer: Option>>, // MemDiskStoreReplacerKey -> remote_location // status: 0 -> incompleted; 1 -> completed; 2 -> failed // TODO(lanlou): we should clean this hashmap. @@ -115,15 +115,15 @@ impl> MemDiskStoreCache { disk_store, mem_store: Some(RwLock::new(mem_store)), - disk_replacer: Mutex::new(disk_replacer), - mem_replacer: Some(Mutex::new(mem_replacer.unwrap())), + disk_replacer: Arc::new(Mutex::new(disk_replacer)), + mem_replacer: Some(Arc::new(Mutex::new(mem_replacer.unwrap()))), status_of_keys: RwLock::new(HashMap::new()), } } else { MemDiskStoreCache { disk_store, mem_store: None, - disk_replacer: Mutex::new(disk_replacer), + disk_replacer: Arc::new(Mutex::new(disk_replacer)), mem_replacer: None, status_of_keys: RwLock::new(HashMap::new()), } @@ -141,18 +141,20 @@ impl> D ) -> ParpulseResult>>> { // TODO: Refine the lock. if let Some(mem_store) = &self.mem_store { - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; - if let Some(replacer_value) = mem_replacer.get(&remote_location) { - let data_store_key = replacer_value.as_value(); - match mem_store.read().await.read_data(data_store_key) { + let mut data_store_key = None; + { + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; + if let Some(replacer_value) = mem_replacer.get(&remote_location) { + data_store_key = Some(replacer_value.as_value().clone()); + } + } + if let Some(data_store_key) = data_store_key { + match mem_store + .read() + .await + .read_data(&data_store_key, self.mem_replacer.as_ref().unwrap().clone()) + { Ok(Some(rx)) => { - // TODO(lanlou): actually we should unpin after all the data are consumed... - if !mem_replacer.unpin(&remote_location) { - warn!( - "Failed to unpin the key ({}) in memory replacer.", - remote_location - ); - } return Ok(Some(rx)); } Ok(None) => { @@ -165,18 +167,24 @@ impl> D } } - let mut disk_replacer = self.disk_replacer.lock().await; - match disk_replacer.get(&remote_location) { - Some(replacer_value) => { - let data_store_key = replacer_value.as_value(); - if let Some(data) = self.disk_store.read_data(data_store_key).await? { - disk_replacer.unpin(&remote_location); - Ok(Some(data)) - } else { - unreachable!() + let data_store_key; + { + let mut disk_replacer = self.disk_replacer.lock().await; + match disk_replacer.get(&remote_location) { + Some(replacer_value) => { + data_store_key = replacer_value.as_value().clone(); } + None => return Ok(None), } - None => Ok(None), + } + if let Some(data) = self + .disk_store + .read_data(&data_store_key, self.disk_replacer.clone()) + .await? + { + Ok(Some(data)) + } else { + unreachable!(); } } diff --git a/storage-node/src/cache/replacer/lru.rs b/storage-node/src/cache/replacer/lru.rs index d5e58c1..ad3c8d4 100644 --- a/storage-node/src/cache/replacer/lru.rs +++ b/storage-node/src/cache/replacer/lru.rs @@ -1,5 +1,3 @@ -use std::ops::Deref; - use hashlink::linked_hash_map; use hashlink::LinkedHashMap; use log::{debug, warn}; @@ -89,7 +87,7 @@ impl LruReplacer { fn unpin_key(&mut self, key: &K) -> bool { match self.cache_map.get_mut(key) { Some((_, pin_count)) => { - if *pin_count.deref() == 0 { + if *pin_count == 0 { return false; } *pin_count -= 1; diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index ee88554..c86182f 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -102,7 +102,7 @@ mod tests { /// WARNING: Put userdata1.parquet in the storage-node/tests/parquet directory before running this test. #[tokio::test] - async fn test_download_file() { + async fn test_server() { let original_file_path = "tests/parquet/userdata1.parquet"; // Start the server @@ -113,6 +113,7 @@ mod tests { // Give the server some time to start tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Test1: test_download_file let url = "http://localhost:3030/file?bucket=tests-parquet&keys=userdata1.parquet&is_test=true"; let client = Client::new(); @@ -140,22 +141,12 @@ mod tests { // Check if file sizes are equal assert_eq!( fs::metadata(original_file_path).unwrap().len(), - fs::metadata(file_path).unwrap().len() + fs::metadata(file_path.clone()).unwrap().len() ); - server_handle.abort(); - } - - #[tokio::test] - async fn test_file_not_exist() { - // Start the server - let server_handle = tokio::spawn(async move { - storage_node_serve("127.0.0.1", 3030).await.unwrap(); - }); - - // Give the server some time to start - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + assert_eq!(fs::metadata(file_path).unwrap().len(), 113629); + // Test2: test_file_not_exist let url = "http://localhost:3030/file?bucket=tests-parquet&keys=not_exist.parquet&is_test=true"; let client = Client::new(); diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index d1c0571..95dce3f 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -343,6 +343,8 @@ mod tests { let request_path_keys2 = vec!["userdata1.parquet".to_string()]; let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + let mut start_time = Instant::now(); + let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); @@ -365,5 +367,40 @@ mod tests { assert_eq!(consume_receiver(result.3.unwrap()).await, 113629); assert!(result.4.is_ok()); assert_eq!(consume_receiver(result.4.unwrap()).await, 112193); + + let delta_time_miss = Instant::now() - start_time; + + start_time = Instant::now(); + + let get_data_fut_1 = storage_manager.get_data(request_data2.clone(), true); + let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); + let get_data_fut_4 = storage_manager.get_data(request_data1.clone(), true); + let get_data_fut_5 = storage_manager.get_data(request_data1.clone(), true); + let result = join!( + get_data_fut_1, + get_data_fut_2, + get_data_fut_3, + get_data_fut_4, + get_data_fut_5 + ); + assert!(result.0.is_ok()); + assert_eq!(consume_receiver(result.0.unwrap()).await, 113629); + assert!(result.1.is_ok()); + assert_eq!(consume_receiver(result.1.unwrap()).await, 112193); + assert!(result.2.is_ok()); + assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + assert!(result.3.is_ok()); + assert_eq!(consume_receiver(result.3.unwrap()).await, 112193); + assert!(result.4.is_ok()); + assert_eq!(consume_receiver(result.4.unwrap()).await, 112193); + + let delta_time_hit = Instant::now() - start_time; + + println!( + "For parallel test 2, delta time miss: {:?}, delta time miss: {:?}", + delta_time_miss, delta_time_hit + ); + assert!(delta_time_miss > delta_time_hit); } } From d2b1044b91cd43695a154dfb74beb85a3275eccb Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 17:08:37 -0400 Subject: [PATCH 12/23] apply comment suggestions --- .../src/cache/data_store_cache/memdisk/mod.rs | 137 +++++++++++------- storage-node/src/storage_manager.rs | 118 +++++++++++---- 2 files changed, 175 insertions(+), 80 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 9fa38fb..108e420 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -211,6 +211,7 @@ impl> D } match status { Status::Incompleted => { + // TODO(lanlou): Make these 2 code atomic!! *notify.1.lock().await += 1; notify.0.notified().await; } @@ -295,10 +296,11 @@ impl> D // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { let mut evicted_bytes_to_disk_inner = Vec::new(); + let mut mem_store = mem_store.write().await; for evicted_key in evicted_keys { evicted_bytes_to_disk_inner.push(( evicted_key.clone(), - mem_store.write().await.clean_data(&evicted_key).unwrap(), + mem_store.clean_data(&evicted_key).unwrap(), )); } evicted_bytes_to_disk = Some(evicted_bytes_to_disk_inner); @@ -355,8 +357,6 @@ impl> D // 4. If the data is not written to memory_cache successfully, then cache it to disk. // Need to write the data to network for the current key. let disk_store_key = self.disk_store.data_store_key(&remote_location); - // TODO: Write the data store cache first w/ `incompleted` state and update the state - // after finishing writing into the data store. let data_size_wrap = self .disk_store .write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream)) @@ -570,12 +570,12 @@ mod tests { async fn test_same_requests_simultaneously_disk() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); - let cache = MemDiskStoreCache::new( + let cache = Arc::new(MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), None, None, - ); + )); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata2.parquet".to_string()]; let remote_location = bucket.clone() + &keys[0]; @@ -583,41 +583,57 @@ mod tests { let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await; let reader3 = MockS3Reader::new(bucket.clone(), keys).await; - let put_data_fut_1 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader.into_stream().await.unwrap(), - ); - let put_data_fut_2 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader2.into_stream().await.unwrap(), - ); - let put_data_fut_3 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader3.into_stream().await.unwrap(), - ); + let cache_1 = cache.clone(); + let remote_location_1 = remote_location.clone(); + let put_data_fut_1 = tokio::spawn(async move { + cache_1 + .put_data_to_cache(remote_location_1, None, reader.into_stream().await.unwrap()) + .await + }); + + let cache_2 = cache.clone(); + let remote_location_2 = remote_location.clone(); + let put_data_fut_2 = tokio::spawn(async move { + cache_2 + .put_data_to_cache( + remote_location_2, + None, + reader2.into_stream().await.unwrap(), + ) + .await + }); + + let cache_3 = cache.clone(); + let remote_location_3 = remote_location.clone(); + let put_data_fut_3 = tokio::spawn(async move { + cache_3 + .put_data_to_cache( + remote_location_3, + None, + reader3.into_stream().await.unwrap(), + ) + .await + }); let res = join!(put_data_fut_1, put_data_fut_2, put_data_fut_3); assert!(res.0.is_ok()); assert!(res.1.is_ok()); assert!(res.2.is_ok()); - assert_eq!(res.0.unwrap(), 112193); - assert_eq!(res.1.unwrap(), 112193); - assert_eq!(res.2.unwrap(), 112193); + assert_eq!(res.0.unwrap().unwrap(), 112193); + assert_eq!(res.1.unwrap().unwrap(), 112193); + assert_eq!(res.2.unwrap().unwrap(), 112193); } #[tokio::test] async fn test_same_requests_simultaneously_mix() { let tmp = tempfile::tempdir().unwrap(); let disk_base_path = tmp.path().to_owned(); - let cache = MemDiskStoreCache::new( + let cache = Arc::new(MemDiskStoreCache::new( LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(120000)), Some(120000), - ); + )); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; let keys2 = vec!["userdata2.parquet".to_string()]; @@ -628,26 +644,49 @@ mod tests { let reader3 = MockS3Reader::new(bucket.clone(), keys2).await; let reader4 = MockS3Reader::new(bucket.clone(), keys).await; - let put_data_fut_1 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader.into_stream().await.unwrap(), - ); - let put_data_fut_2 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader2.into_stream().await.unwrap(), - ); - let put_data_fut_3 = cache.put_data_to_cache( - remote_location2.clone(), - None, - reader3.into_stream().await.unwrap(), - ); - let put_data_fut_4 = cache.put_data_to_cache( - remote_location.clone(), - None, - reader4.into_stream().await.unwrap(), - ); + let cache_1 = cache.clone(); + let remote_location_1 = remote_location.clone(); + let put_data_fut_1 = tokio::spawn(async move { + cache_1 + .put_data_to_cache(remote_location_1, None, reader.into_stream().await.unwrap()) + .await + }); + + let cache_2 = cache.clone(); + let remote_location_2 = remote_location.clone(); + let put_data_fut_2 = tokio::spawn(async move { + cache_2 + .put_data_to_cache( + remote_location_2, + None, + reader2.into_stream().await.unwrap(), + ) + .await + }); + + let cache_3 = cache.clone(); + let remote_location_3 = remote_location2.clone(); + let put_data_fut_3 = tokio::spawn(async move { + cache_3 + .put_data_to_cache( + remote_location_3, + None, + reader3.into_stream().await.unwrap(), + ) + .await + }); + + let cache_4 = cache.clone(); + let remote_location_4 = remote_location.clone(); + let put_data_fut_4 = tokio::spawn(async move { + cache_4 + .put_data_to_cache( + remote_location_4, + None, + reader4.into_stream().await.unwrap(), + ) + .await + }); let res = join!( put_data_fut_1, @@ -659,9 +698,9 @@ mod tests { assert!(res.1.is_ok()); assert!(res.2.is_ok()); assert!(res.3.is_ok()); - assert_eq!(res.0.unwrap(), 113629); - assert_eq!(res.1.unwrap(), 113629); - assert_eq!(res.2.unwrap(), 112193); - assert_eq!(res.3.unwrap(), 113629); + assert_eq!(res.0.unwrap().unwrap(), 113629); + assert_eq!(res.1.unwrap().unwrap(), 113629); + assert_eq!(res.2.unwrap().unwrap(), 112193); + assert_eq!(res.3.unwrap().unwrap(), 113629); } } diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 95dce3f..c8b8537 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -93,7 +93,7 @@ pub trait ParpulseReaderIterator: Iterator> { #[cfg(test)] mod tests { use futures::join; - use std::time::Instant; + use std::{sync::Arc, time::Instant}; use crate::cache::{data_store_cache::memdisk::MemDiskStoreCache, replacer::lru::LruReplacer}; @@ -289,7 +289,7 @@ mod tests { None, None, ); - let storage_manager = StorageManager::new(data_store_cache); + let storage_manager = Arc::new(StorageManager::new(data_store_cache)); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -299,10 +299,26 @@ mod tests { let request_path_keys2 = vec!["userdata2.parquet".to_string()]; let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); - let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); - let get_data_fut_4 = storage_manager.get_data(request_data1.clone(), true); + let storage_manager_1 = storage_manager.clone(); + let request_data1_1 = request_data1.clone(); + let get_data_fut_1 = + tokio::spawn(async move { storage_manager_1.get_data(request_data1_1, true).await }); + + let storage_manager_2 = storage_manager.clone(); + let request_data1_2 = request_data1.clone(); + let get_data_fut_2 = + tokio::spawn(async move { storage_manager_2.get_data(request_data1_2, true).await }); + + let storage_manager_3 = storage_manager.clone(); + let request_data2_3 = request_data2.clone(); + let get_data_fut_3 = + tokio::spawn(async move { storage_manager_3.get_data(request_data2_3, true).await }); + + let storage_manager_4 = storage_manager.clone(); + let request_data1_4 = request_data1.clone(); + let get_data_fut_4 = + tokio::spawn(async move { storage_manager_4.get_data(request_data1_4, true).await }); + let result = join!( get_data_fut_1, get_data_fut_2, @@ -310,13 +326,13 @@ mod tests { get_data_fut_4 ); assert!(result.0.is_ok()); - assert_eq!(consume_receiver(result.0.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.0.unwrap().unwrap()).await, 113629); assert!(result.1.is_ok()); - assert_eq!(consume_receiver(result.1.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.1.unwrap().unwrap()).await, 113629); assert!(result.2.is_ok()); - assert_eq!(consume_receiver(result.2.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.2.unwrap().unwrap()).await, 112193); assert!(result.3.is_ok()); - assert_eq!(consume_receiver(result.3.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.3.unwrap().unwrap()).await, 113629); } #[tokio::test] @@ -333,7 +349,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = StorageManager::new(data_store_cache); + let storage_manager = Arc::new(StorageManager::new(data_store_cache)); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; @@ -345,11 +361,31 @@ mod tests { let mut start_time = Instant::now(); - let get_data_fut_1 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); - let get_data_fut_4 = storage_manager.get_data(request_data2.clone(), true); - let get_data_fut_5 = storage_manager.get_data(request_data1.clone(), true); + let storage_manager_1 = storage_manager.clone(); + let request_data1_1 = request_data1.clone(); + let get_data_fut_1 = + tokio::spawn(async move { storage_manager_1.get_data(request_data1_1, true).await }); + + let storage_manager_2 = storage_manager.clone(); + let request_data1_2 = request_data1.clone(); + let get_data_fut_2 = + tokio::spawn(async move { storage_manager_2.get_data(request_data1_2, true).await }); + + let storage_manager_3 = storage_manager.clone(); + let request_data2_3 = request_data2.clone(); + let get_data_fut_3 = + tokio::spawn(async move { storage_manager_3.get_data(request_data2_3, true).await }); + + let storage_manager_4 = storage_manager.clone(); + let request_data2_4 = request_data2.clone(); + let get_data_fut_4 = + tokio::spawn(async move { storage_manager_4.get_data(request_data2_4, true).await }); + + let storage_manager_5 = storage_manager.clone(); + let request_data1_5 = request_data1.clone(); + let get_data_fut_5 = + tokio::spawn(async move { storage_manager_5.get_data(request_data1_5, true).await }); + let result = join!( get_data_fut_1, get_data_fut_2, @@ -358,25 +394,45 @@ mod tests { get_data_fut_5 ); assert!(result.0.is_ok()); - assert_eq!(consume_receiver(result.0.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.0.unwrap().unwrap()).await, 112193); assert!(result.1.is_ok()); - assert_eq!(consume_receiver(result.1.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.1.unwrap().unwrap()).await, 112193); assert!(result.2.is_ok()); - assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.2.unwrap().unwrap()).await, 113629); assert!(result.3.is_ok()); - assert_eq!(consume_receiver(result.3.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.3.unwrap().unwrap()).await, 113629); assert!(result.4.is_ok()); - assert_eq!(consume_receiver(result.4.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.4.unwrap().unwrap()).await, 112193); let delta_time_miss = Instant::now() - start_time; start_time = Instant::now(); - let get_data_fut_1 = storage_manager.get_data(request_data2.clone(), true); - let get_data_fut_2 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_3 = storage_manager.get_data(request_data2.clone(), true); - let get_data_fut_4 = storage_manager.get_data(request_data1.clone(), true); - let get_data_fut_5 = storage_manager.get_data(request_data1.clone(), true); + let storage_manager_1 = storage_manager.clone(); + let request_data2_1 = request_data2.clone(); + let get_data_fut_1 = + tokio::spawn(async move { storage_manager_1.get_data(request_data2_1, true).await }); + + let storage_manager_2 = storage_manager.clone(); + let request_data1_2 = request_data1.clone(); + let get_data_fut_2 = + tokio::spawn(async move { storage_manager_2.get_data(request_data1_2, true).await }); + + let storage_manager_3 = storage_manager.clone(); + let request_data2_3 = request_data2.clone(); + let get_data_fut_3 = + tokio::spawn(async move { storage_manager_3.get_data(request_data2_3, true).await }); + + let storage_manager_4 = storage_manager.clone(); + let request_data1_4 = request_data1.clone(); + let get_data_fut_4 = + tokio::spawn(async move { storage_manager_4.get_data(request_data1_4, true).await }); + + let storage_manager_5 = storage_manager.clone(); + let request_data1_5 = request_data1.clone(); + let get_data_fut_5 = + tokio::spawn(async move { storage_manager_5.get_data(request_data1_5, true).await }); + let result = join!( get_data_fut_1, get_data_fut_2, @@ -385,15 +441,15 @@ mod tests { get_data_fut_5 ); assert!(result.0.is_ok()); - assert_eq!(consume_receiver(result.0.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.0.unwrap().unwrap()).await, 113629); assert!(result.1.is_ok()); - assert_eq!(consume_receiver(result.1.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.1.unwrap().unwrap()).await, 112193); assert!(result.2.is_ok()); - assert_eq!(consume_receiver(result.2.unwrap()).await, 113629); + assert_eq!(consume_receiver(result.2.unwrap().unwrap()).await, 113629); assert!(result.3.is_ok()); - assert_eq!(consume_receiver(result.3.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.3.unwrap().unwrap()).await, 112193); assert!(result.4.is_ok()); - assert_eq!(consume_receiver(result.4.unwrap()).await, 112193); + assert_eq!(consume_receiver(result.4.unwrap().unwrap()).await, 112193); let delta_time_hit = Instant::now() - start_time; From 87c0d6a5520cc756eb4f45f614e6d030221a088d Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 18:31:21 -0400 Subject: [PATCH 13/23] apply comment suggestions --- .../src/cache/data_store_cache/memdisk/mod.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 108e420..351ffe6 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -30,8 +30,9 @@ pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; /// [`MemDiskStoreReplacerKey`] is a path to the remote object store. pub type MemDiskStoreReplacerKey = String; -// Status -> completed/incompleted; usize -> file_size -// Notify -> notify; usize -> notify_waiter_count +/// Status -> completed/incompleted; usize -> file_size +/// Notify -> notify; usize -> notify_waiter_count +/// FIXME: make (Notify, Mutex) a struct to make it more readable. type StatusKeyHashMap = HashMap)>)>; pub struct MemDiskStoreReplacerValue { @@ -159,7 +160,7 @@ impl> D } Ok(None) => { return Err(ParpulseError::Internal( - "Memory replacer and memory store keys are inconsistent.".to_string(), + "Memory replacer and memory store is inconsistent.".to_string(), )) } Err(e) => return Err(e), @@ -184,7 +185,9 @@ impl> D { Ok(Some(data)) } else { - unreachable!(); + return Err(ParpulseError::Internal( + "Disk replacer and disk store is inconsistent.".to_string(), + )); } } @@ -249,6 +252,7 @@ impl> D Some(Ok(bytes)) => { bytes_mem_written += bytes.len(); // TODO: Need to write the data to network. + // TODO(lanlou): we need benchmark future, to put this lock outside the loop. if let Some((bytes_vec, _)) = mem_store .write() .await From 7e174ec19ce01febd76c0b4374a29fadaba37cb8 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 28 Apr 2024 19:02:04 -0400 Subject: [PATCH 14/23] fix: status may change when releasing lock --- .../src/cache/data_store_cache/memdisk/mod.rs | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 351ffe6..e12f8a0 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -201,27 +201,28 @@ impl> D // If in_progress of remote_location thread fails, it will clean the data from this hash map. let mut existed = false; loop { - let (status, size, notify); - if let Some(((status_ref, size_ref), notify_ref)) = - self.status_of_keys.read().await.get(&remote_location) - { + let status_of_keys = self.status_of_keys.read().await; + if let Some(((status, size), notify_ref)) = status_of_keys.get(&remote_location) { + let notify; existed = true; - status = status_ref.clone(); - size = *size_ref; - notify = notify_ref.clone(); + match status { + Status::Incompleted => { + notify = notify_ref.clone(); + *notify_ref.1.lock().await += 1; + // The Notified future is guaranteed to receive wakeups from notify_waiters() + // as soon as it has been created, even if it has not yet been polled. + let notified = notify.0.notified(); + drop(status_of_keys); + notified.await; + } + Status::Completed => { + return Ok(*size); + } + } } else { + // FIXME: status_of_keys lock should be released after break break; } - match status { - Status::Incompleted => { - // TODO(lanlou): Make these 2 code atomic!! - *notify.1.lock().await += 1; - notify.0.notified().await; - } - Status::Completed => { - return Ok(size); - } - } } if existed { // Another in progress of remote_location thread fails From 285eab8d83ce4ff612be60acc2df32810b3a8068 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 12:16:05 -0400 Subject: [PATCH 15/23] improve notify_waiters --- .../src/cache/data_store_cache/memdisk/mod.rs | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index e12f8a0..0d2eb96 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -347,14 +347,20 @@ impl> D // 3. If the data is successfully written to memory, directly return. if self.mem_store.is_some() && bytes_to_disk.is_none() { - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = bytes_mem_written; + let notify; { - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; - mem_replacer.pin(&remote_location, *notify.1.lock().await); + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = + status_of_keys.get_mut(&remote_location).unwrap(); + *status = Status::Completed; + *size = bytes_mem_written; + { + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; + mem_replacer.pin(&remote_location, *notify_ref.1.lock().await); + } + notify = notify_ref.clone(); } + // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock notify.0.notify_waiters(); return Ok(bytes_mem_written); } @@ -394,15 +400,18 @@ impl> D } disk_replacer.pin(&remote_location, 1); } - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = data_size; - self.disk_replacer - .lock() - .await - .pin(&remote_location, *notify.1.lock().await); - // FIXME: disk_replacer lock should be released here + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = status_of_keys.get_mut(&remote_location).unwrap(); + *status = Status::Completed; + *size = data_size; + self.disk_replacer + .lock() + .await + .pin(&remote_location, *notify_ref.1.lock().await); + notify = notify_ref.clone(); + } notify.0.notify_waiters(); Ok(data_size) } From 8a55d991f4c45708a35cb69651b498eccb8412cf Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 12:27:44 -0400 Subject: [PATCH 16/23] fix: lru will find next unpin to evict --- storage-node/src/cache/replacer/lru.rs | 40 +++++++++++++++++++------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/storage-node/src/cache/replacer/lru.rs b/storage-node/src/cache/replacer/lru.rs index ad3c8d4..e1df5f7 100644 --- a/storage-node/src/cache/replacer/lru.rs +++ b/storage-node/src/cache/replacer/lru.rs @@ -54,21 +54,33 @@ impl LruReplacer { self.size -= cache_value.0.size(); } let mut evicted_keys = Vec::new(); - while (self.size + value.size()) > self.max_capacity { - // Previous code guarantees that there is at least 1 element in the cache_map, so we - // can safely unwrap here. - if self.cache_map.front().unwrap().1 .1 > 0 { - // TODO(lanlou): Actually we should look next to evict, but for simplicity, just - // return None here, it is temporarily okay since we will not pin an element for - // long time now. - return None; + let mut iter = self.cache_map.iter(); + let mut current_size = self.size; + while (current_size + value.size()) > self.max_capacity { + match iter.next() { + Some((key, (value, pin_count))) => { + if *pin_count > 0 { + // If the key is pinned, we do not evict the key. + continue; + } + evicted_keys.push(key.clone()); + current_size -= value.size(); + } + None => { + return None; + } } - if let Some((key, cache_value)) = self.cache_map.pop_front() { + } + + for key in &evicted_keys { + if let Some(cache_value) = self.cache_map.remove(key) { debug!("-------- Evicting Key: {:?} --------", key); - evicted_keys.push(key); self.size -= cache_value.0.size(); + } else { + return None; } } + self.size += value.size(); self.cache_map.insert(key.clone(), (value, 0)); Some(evicted_keys) @@ -256,5 +268,13 @@ mod tests { .put("key2".to_string(), ("value2".to_string(), 2)) .is_some()); assert_eq!(replacer.size(), 2); + assert!(replacer.pin(&"key2".to_string(), 1)); + replacer.put("key3".to_string(), ("value3".to_string(), 8)); + assert_eq!(replacer.size(), 10); + replacer.put("key4".to_string(), ("value4".to_string(), 7)); + assert_eq!(replacer.size(), 9); + assert!(replacer.get(&"key2".to_string()).is_some()); + assert!(replacer.get(&"key4".to_string()).is_some()); + assert!(replacer.get(&"key3".to_string()).is_none()); } } From cb6a70161376ea12681773db595b1be3a07516ae Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 12:33:25 -0400 Subject: [PATCH 17/23] modify sqlite interface --- storage-node/src/cache/data_store_cache/sqlite/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/sqlite/mod.rs b/storage-node/src/cache/data_store_cache/sqlite/mod.rs index fcc6f56..e12a1ba 100644 --- a/storage-node/src/cache/data_store_cache/sqlite/mod.rs +++ b/storage-node/src/cache/data_store_cache/sqlite/mod.rs @@ -103,7 +103,7 @@ impl> Dat for SqliteStoreCache { async fn get_data_from_cache( - &mut self, + &self, remote_location: String, ) -> ParpulseResult>>> { let mut replacer = self.replacer.lock().await; @@ -136,7 +136,7 @@ impl> Dat } async fn put_data_to_cache( - &mut self, + &self, remote_location: String, data_size: Option, mut data_stream: StorageReaderStream, @@ -187,7 +187,7 @@ mod tests { let sqlite_base_path = tmp.path().to_owned().join(Path::new("sqlite_test.db")); let replacer = LruReplacer::new(1024); let buffer_size = 100; - let mut cache = SqliteStoreCache::new( + let cache = SqliteStoreCache::new( replacer, sqlite_base_path.to_str().unwrap().to_string(), Some(buffer_size), From e7f1349fe3cbf0ff7319738ecd96b92b06dc0619 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 13:31:07 -0400 Subject: [PATCH 18/23] test: imporve evict_pinned_key test for lruk --- storage-node/src/cache/replacer/lru_k.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/storage-node/src/cache/replacer/lru_k.rs b/storage-node/src/cache/replacer/lru_k.rs index 610a966..b6d3789 100644 --- a/storage-node/src/cache/replacer/lru_k.rs +++ b/storage-node/src/cache/replacer/lru_k.rs @@ -371,5 +371,13 @@ mod tests { .put("key2".to_string(), ("value2".to_string(), 2)) .is_some()); assert_eq!(replacer.size(), 2); + assert!(replacer.pin(&"key2".to_string(), 1)); + replacer.put("key3".to_string(), ("value3".to_string(), 8)); + assert_eq!(replacer.size(), 10); + replacer.put("key4".to_string(), ("value4".to_string(), 7)); + assert_eq!(replacer.size(), 9); + assert!(replacer.get(&"key2".to_string()).is_some()); + assert!(replacer.get(&"key4".to_string()).is_some()); + assert!(replacer.get(&"key3".to_string()).is_none()); } } From ba6739a82c5931708d7abebaf43fe6604f7346bc Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 15:49:18 -0400 Subject: [PATCH 19/23] feat: fanout cache --- storage-node/src/server.rs | 25 ++++++-- storage-node/src/storage_manager.rs | 90 ++++++++++++++++++++++++----- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index c86182f..fcb87a9 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -12,17 +12,32 @@ use crate::{ }; const CACHE_BASE_PATH: &str = "cache/"; +const DATA_STORE_CACHE_NUMBER: usize = 6; pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> { // Should at least be able to store one 100MB file in the cache. - let dummy_size = 100 * 1024 * 1024; // TODO: Read the type of the cache from config. - let cache = LruReplacer::new(dummy_size); - // TODO: cache_base_path should be from config - let data_store_cache = MemDiskStoreCache::new(cache, CACHE_BASE_PATH.to_string(), None, None); + let dummy_size_per_disk_cache = 100 * 1024 * 1024; + let dummy_size_per_mem_cache = 100 * 1024; + let dummy_mem_max_file_cache = 10 * 1024; + + let mut data_store_caches = Vec::new(); + + for i in 0..DATA_STORE_CACHE_NUMBER { + let disk_replacer = LruReplacer::new(dummy_size_per_disk_cache); + let mem_replace = LruReplacer::new(dummy_size_per_mem_cache); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + CACHE_BASE_PATH, + Some(mem_replace), + Some(dummy_mem_max_file_cache), + ); + data_store_caches.push(data_store_cache); + } + let is_mem_disk_cache = true; // TODO: try to use more fine-grained lock instead of locking the whole storage_manager - let storage_manager = Arc::new(StorageManager::new(data_store_cache)); + let storage_manager = Arc::new(StorageManager::new(data_store_caches)); let route = warp::path!("file") .and(warp::path::end()) diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index c8b8537..18e9ef5 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -3,8 +3,11 @@ use crate::{ error::ParpulseResult, storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader}, }; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use bytes::Bytes; +use log::debug; use storage_common::RequestParams; use tokio::sync::mpsc::Receiver; @@ -16,12 +19,18 @@ use tokio::sync::mpsc::Receiver; pub struct StorageManager { /// We don't use lock here because `data_store_cache` itself should handle the concurrency. - data_store_cache: C, + data_store_caches: Vec, +} + +fn calculate_hash(t: &T) -> u64 { + let mut s = DefaultHasher::new(); + t.hash(&mut s); + s.finish() } impl StorageManager { - pub fn new(data_store_cache: C) -> Self { - Self { data_store_cache } + pub fn new(data_store_caches: Vec) -> Self { + Self { data_store_caches } } pub async fn get_data( @@ -43,8 +52,15 @@ impl StorageManager { // FIXME: Cache key should be . Might refactor the underlying S3 // reader as one S3 key for one reader. let cache_key = format!("{}-{}", bucket, keys.join(",")); - let data_rx = self - .data_store_cache + let cache_index = calculate_hash(&cache_key) as usize % self.data_store_caches.len(); + let data_store_cache = self.data_store_caches.get(cache_index).unwrap(); + + debug!( + "For cache key: {}, the corresponding data_store_cache index {}", + cache_key, cache_index + ); + + let data_rx = data_store_cache .get_data_from_cache(cache_key.clone()) .await?; if let Some(data_rx) = data_rx { @@ -67,12 +83,11 @@ impl StorageManager { }; (reader.into_stream().await?, data_size) }; - self.data_store_cache + data_store_cache .put_data_to_cache(cache_key.clone(), data_size, stream) .await?; // TODO (kunle): Push down the response writer rather than calling get_data_from_cache again. - let data_rx = self - .data_store_cache + let data_rx = data_store_cache .get_data_from_cache(cache_key.clone()) .await?; if data_rx.is_none() { @@ -123,7 +138,7 @@ mod tests { let data_store_cache = MemDiskStoreCache::new(cache, cache_base_path.display().to_string(), None, None); - let storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(vec![data_store_cache]); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; @@ -179,7 +194,7 @@ mod tests { Some(mem_cache), Some(950), ); - let storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(vec![data_store_cache]); let request_path_small_bucket = "tests-text".to_string(); let request_path_small_keys = vec!["what-can-i-hold-you-with".to_string()]; @@ -238,7 +253,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = StorageManager::new(data_store_cache); + let storage_manager = StorageManager::new(vec![data_store_cache]); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -289,7 +304,7 @@ mod tests { None, None, ); - let storage_manager = Arc::new(StorageManager::new(data_store_cache)); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -349,7 +364,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = Arc::new(StorageManager::new(data_store_cache)); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; @@ -459,4 +474,53 @@ mod tests { ); assert!(delta_time_miss > delta_time_hit); } + + #[tokio::test] + async fn test_fanout_cache() { + let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + + let disk_cache2 = LruReplacer::new(1000000); + let mem_cache2 = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache2 = MemDiskStoreCache::new( + disk_cache2, + disk_cache_base_path.display().to_string(), + Some(mem_cache2), + Some(120000), + ); + let storage_manager = Arc::new(StorageManager::new(vec![ + data_store_cache, + data_store_cache2, + ])); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata1.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let result = storage_manager.get_data(request_data1.clone(), true).await; + assert!(result.is_ok()); + assert_eq!(consume_receiver(result.unwrap()).await, 113629); + + let request_path_bucket2 = "tests-text".to_string(); + let request_path_keys2 = vec!["what-can-i-hold-you-with".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let result = storage_manager.get_data(request_data2.clone(), true).await; + assert!(result.is_ok()); + assert_eq!(consume_receiver(result.unwrap()).await, 930); + } } From e3e5de5c47a64726f6617e98f2e5228a22fa177c Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 15:52:22 -0400 Subject: [PATCH 20/23] fix: println->debug --- .../src/cache/data_store_cache/memdisk/data_store/disk.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index f1e3955..3b4603d 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -2,6 +2,7 @@ use std::{fs, sync::Arc}; use bytes::Bytes; use futures::StreamExt; +use log::debug; use tokio::sync::{mpsc::Receiver, Mutex}; use crate::{ @@ -29,7 +30,7 @@ pub struct DiskStore { impl Drop for DiskStore { fn drop(&mut self) { if fs::metadata(&self.base_path).is_ok() { - println!("Removing cache files: {:?}", self.base_path); + debug!("Removing cache files: {:?}", self.base_path); fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed"); } } From 86e163b6840d9dfa6fadaf9a3223878a56b67b35 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 16:57:51 -0400 Subject: [PATCH 21/23] fix: move hash functions to common --- storage-node/Cargo.toml | 1 + storage-node/src/common/hash.rs | 13 +++++ storage-node/src/common/mod.rs | 1 + storage-node/src/lib.rs | 1 + storage-node/src/storage_manager.rs | 83 +++++++++++++++-------------- 5 files changed, 58 insertions(+), 41 deletions(-) create mode 100644 storage-node/src/common/hash.rs create mode 100644 storage-node/src/common/mod.rs diff --git a/storage-node/Cargo.toml b/storage-node/Cargo.toml index 11ac71c..6571908 100644 --- a/storage-node/Cargo.toml +++ b/storage-node/Cargo.toml @@ -25,3 +25,4 @@ tokio-stream = "0.1" rusqlite = { version = "0.31", features = ["blob"] } log = "0.4" env_logger = "0.11" +crc32fast = "1.4.0" diff --git a/storage-node/src/common/hash.rs b/storage-node/src/common/hash.rs new file mode 100644 index 0000000..cafd2fd --- /dev/null +++ b/storage-node/src/common/hash.rs @@ -0,0 +1,13 @@ +use std::hash::Hasher; + +pub fn calculate_hash_default(data: &[u8]) -> usize { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + hasher.write(data); + hasher.finish() as usize +} + +pub fn calculate_hash_crc32fast(data: &[u8]) -> usize { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(data); + hasher.finalize() as usize +} diff --git a/storage-node/src/common/mod.rs b/storage-node/src/common/mod.rs new file mode 100644 index 0000000..ec5d33c --- /dev/null +++ b/storage-node/src/common/mod.rs @@ -0,0 +1 @@ +pub mod hash; diff --git a/storage-node/src/lib.rs b/storage-node/src/lib.rs index c1cd0fa..bea08fd 100644 --- a/storage-node/src/lib.rs +++ b/storage-node/src/lib.rs @@ -1,6 +1,7 @@ #![allow(clippy::new_without_default)] pub mod cache; +pub mod common; pub mod disk; pub mod error; pub mod server; diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 18e9ef5..8da98aa 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -1,10 +1,9 @@ use crate::{ cache::data_store_cache::DataStoreCache, + common::hash::calculate_hash_crc32fast, error::ParpulseResult, storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader}, }; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use bytes::Bytes; use log::debug; @@ -22,11 +21,11 @@ pub struct StorageManager { data_store_caches: Vec, } -fn calculate_hash(t: &T) -> u64 { - let mut s = DefaultHasher::new(); - t.hash(&mut s); - s.finish() -} +// fn calculate_hash(t: &T) -> u64 { +// let mut s = DefaultHasher::new(); +// t.hash(&mut s); +// s.finish() +// } impl StorageManager { pub fn new(data_store_caches: Vec) -> Self { @@ -52,7 +51,8 @@ impl StorageManager { // FIXME: Cache key should be . Might refactor the underlying S3 // reader as one S3 key for one reader. let cache_key = format!("{}-{}", bucket, keys.join(",")); - let cache_index = calculate_hash(&cache_key) as usize % self.data_store_caches.len(); + let hash = calculate_hash_crc32fast(cache_key.as_bytes()); + let cache_index = hash as usize % self.data_store_caches.len(); let data_store_cache = self.data_store_caches.get(cache_index).unwrap(); debug!( @@ -477,35 +477,24 @@ mod tests { #[tokio::test] async fn test_fanout_cache() { - let disk_cache = LruReplacer::new(1000000); - let mem_cache = LruReplacer::new(120000); - - let tmp = tempfile::tempdir().unwrap(); - let disk_cache_base_path = tmp.path().to_owned(); - - let data_store_cache = MemDiskStoreCache::new( - disk_cache, - disk_cache_base_path.display().to_string(), - Some(mem_cache), - Some(120000), - ); - - let disk_cache2 = LruReplacer::new(1000000); - let mem_cache2 = LruReplacer::new(120000); - - let tmp = tempfile::tempdir().unwrap(); - let disk_cache_base_path = tmp.path().to_owned(); - - let data_store_cache2 = MemDiskStoreCache::new( - disk_cache2, - disk_cache_base_path.display().to_string(), - Some(mem_cache2), - Some(120000), - ); - let storage_manager = Arc::new(StorageManager::new(vec![ - data_store_cache, - data_store_cache2, - ])); + let data_store_cache_num = 6; + let mut data_store_caches = Vec::new(); + for _ in 0..data_store_cache_num { + let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManager::new(data_store_caches)); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -514,13 +503,25 @@ mod tests { let result = storage_manager.get_data(request_data1.clone(), true).await; assert!(result.is_ok()); assert_eq!(consume_receiver(result.unwrap()).await, 113629); - - let request_path_bucket2 = "tests-text".to_string(); - let request_path_keys2 = vec!["what-can-i-hold-you-with".to_string()]; + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata2.parquet".to_string()]; let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); - let result = storage_manager.get_data(request_data2.clone(), true).await; assert!(result.is_ok()); + assert_eq!(consume_receiver(result.unwrap()).await, 112193); + + let request_path_bucket3 = "tests-text".to_string(); + let request_path_keys3: Vec = vec!["what-can-i-hold-you-with".to_string()]; + let request_data3 = RequestParams::MockS3((request_path_bucket3, request_path_keys3)); + let result = storage_manager.get_data(request_data3.clone(), true).await; + assert!(result.is_ok()); assert_eq!(consume_receiver(result.unwrap()).await, 930); + + let request_path_bucket4 = "tests-parquet".to_string(); + let request_path_keys4: Vec = vec!["small_random_data.parquet".to_string()]; + let request_data4 = RequestParams::MockS3((request_path_bucket4, request_path_keys4)); + let result = storage_manager.get_data(request_data4.clone(), true).await; + assert!(result.is_ok()); + assert_eq!(consume_receiver(result.unwrap()).await, 2013); } } From d15a1d44592e1c2235df3d9e701f955c4feed84b Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 17:00:27 -0400 Subject: [PATCH 22/23] fix clippy --- storage-node/src/storage_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 8da98aa..f78315d 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -52,7 +52,7 @@ impl StorageManager { // reader as one S3 key for one reader. let cache_key = format!("{}-{}", bucket, keys.join(",")); let hash = calculate_hash_crc32fast(cache_key.as_bytes()); - let cache_index = hash as usize % self.data_store_caches.len(); + let cache_index = hash % self.data_store_caches.len(); let data_store_cache = self.data_store_caches.get(cache_index).unwrap(); debug!( From 11fd2dd339598282ec3c26a946046cbdea73e3e3 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 29 Apr 2024 17:10:53 -0400 Subject: [PATCH 23/23] test: add parallel test for fanout cache --- storage-node/src/storage_manager.rs | 93 +++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index f78315d..43e176d 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -524,4 +524,97 @@ mod tests { assert!(result.is_ok()); assert_eq!(consume_receiver(result.unwrap()).await, 2013); } + + #[tokio::test] + async fn test_fanout_cach_parallel() { + let data_store_cache_num = 6; + let mut data_store_caches = Vec::new(); + for _ in 0..data_store_cache_num { + let disk_cache = LruReplacer::new(1000000); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManager::new(data_store_caches)); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let request_path_bucket3 = "tests-text".to_string(); + let request_path_keys3 = vec!["what-can-i-hold-you-with".to_string()]; + let request_data3 = RequestParams::MockS3((request_path_bucket3, request_path_keys3)); + + let storage_manager_1 = storage_manager.clone(); + let request_data1_1 = request_data1.clone(); + let get_data_fut_1 = + tokio::spawn(async move { storage_manager_1.get_data(request_data1_1, true).await }); + + let storage_manager_2 = storage_manager.clone(); + let request_data1_2 = request_data1.clone(); + let get_data_fut_2 = + tokio::spawn(async move { storage_manager_2.get_data(request_data1_2, true).await }); + + let storage_manager_3 = storage_manager.clone(); + let request_data3_3 = request_data3.clone(); + let get_data_fut_3 = + tokio::spawn(async move { storage_manager_3.get_data(request_data3_3, true).await }); + + let storage_manager_4 = storage_manager.clone(); + let request_data2_4 = request_data2.clone(); + let get_data_fut_4 = + tokio::spawn(async move { storage_manager_4.get_data(request_data2_4, true).await }); + + let storage_manager_5 = storage_manager.clone(); + let request_data2_5 = request_data2.clone(); + let get_data_fut_5 = + tokio::spawn(async move { storage_manager_5.get_data(request_data2_5, true).await }); + + let storage_manager_6 = storage_manager.clone(); + let request_data1_6 = request_data1.clone(); + let get_data_fut_6 = + tokio::spawn(async move { storage_manager_6.get_data(request_data1_6, true).await }); + + let storage_manager_7 = storage_manager.clone(); + let request_data3_7 = request_data3.clone(); + let get_data_fut_7 = + tokio::spawn(async move { storage_manager_7.get_data(request_data3_7, true).await }); + + let result = join!( + get_data_fut_1, + get_data_fut_2, + get_data_fut_3, + get_data_fut_4, + get_data_fut_5, + get_data_fut_6, + get_data_fut_7 + ); + assert!(result.0.is_ok()); + assert_eq!(consume_receiver(result.0.unwrap().unwrap()).await, 112193); + assert!(result.1.is_ok()); + assert_eq!(consume_receiver(result.1.unwrap().unwrap()).await, 112193); + assert!(result.2.is_ok()); + assert_eq!(consume_receiver(result.2.unwrap().unwrap()).await, 930); + assert!(result.3.is_ok()); + assert_eq!(consume_receiver(result.3.unwrap().unwrap()).await, 113629); + assert!(result.4.is_ok()); + assert_eq!(consume_receiver(result.4.unwrap().unwrap()).await, 113629); + assert!(result.5.is_ok()); + assert_eq!(consume_receiver(result.5.unwrap().unwrap()).await, 112193); + assert!(result.6.is_ok()); + assert_eq!(consume_receiver(result.6.unwrap().unwrap()).await, 930); + } }