Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: add fanout cache #60

Merged
merged 26 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b6441d9
Finish initial version
lanlou1554 Apr 27, 2024
4e8df7e
fix clippy
lanlou1554 Apr 27, 2024
1ea4f53
improve details
lanlou1554 Apr 27, 2024
0ac4a69
add tests in storage_manager
lanlou1554 Apr 27, 2024
dafaf5e
refine test name
lanlou1554 Apr 27, 2024
e2c51ab
change lock for replacer to mutex
lanlou1554 Apr 27, 2024
628990f
fix bugs
lanlou1554 Apr 28, 2024
3e09316
Resolve conflicts from main
lanlou1554 Apr 28, 2024
86e23e3
feat: add pin/unpin for replacer to pass test_storage_manager_parallel_2
lanlou1554 Apr 28, 2024
9c19d66
fix bugs
lanlou1554 Apr 28, 2024
5331f07
improve pin function
lanlou1554 Apr 28, 2024
31015f4
fix: unpin after all the data is consumed & fix tests for server
lanlou1554 Apr 28, 2024
d2b1044
apply comment suggestions
lanlou1554 Apr 28, 2024
87c0d6a
apply comment suggestions
lanlou1554 Apr 28, 2024
7e174ec
fix: status may change when releasing lock
lanlou1554 Apr 28, 2024
285eab8
improve notify_waiters
lanlou1554 Apr 29, 2024
8a55d99
fix: lru will find next unpin to evict
lanlou1554 Apr 29, 2024
fd82bd9
Merge remote-tracking branch 'origin' into ll/unlock_disk_manager
lanlou1554 Apr 29, 2024
cb6a701
modify sqlite interface
lanlou1554 Apr 29, 2024
e7f1349
test: imporve evict_pinned_key test for lruk
lanlou1554 Apr 29, 2024
ba6739a
feat: fanout cache
lanlou1554 Apr 29, 2024
e3e5de5
fix: println->debug
lanlou1554 Apr 29, 2024
4fc27c6
merge main and solve conflict
lanlou1554 Apr 29, 2024
86e163b
fix: move hash functions to common
lanlou1554 Apr 29, 2024
d15a1d4
fix clippy
lanlou1554 Apr 29, 2024
11fd2dd
test: add parallel test for fanout cache
lanlou1554 Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storage-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ tokio-stream = "0.1"
rusqlite = { version = "0.31", features = ["blob"] }
log = "0.4"
env_logger = "0.11"
crc32fast = "1.4.0"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fs, sync::Arc};

use bytes::Bytes;
use futures::StreamExt;
use log::debug;
use tokio::sync::{mpsc::Receiver, Mutex};

use crate::{
Expand Down Expand Up @@ -29,7 +30,7 @@ pub struct DiskStore {
impl Drop for DiskStore {
fn drop(&mut self) {
if fs::metadata(&self.base_path).is_ok() {
println!("Removing cache files: {:?}", self.base_path);
debug!("Removing cache files: {:?}", self.base_path);
fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed");
}
}
Expand Down
13 changes: 13 additions & 0 deletions storage-node/src/common/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::hash::Hasher;

pub fn calculate_hash_default(data: &[u8]) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
hasher.write(data);
hasher.finish() as usize
}

pub fn calculate_hash_crc32fast(data: &[u8]) -> usize {
let mut hasher = crc32fast::Hasher::new();
hasher.update(data);
hasher.finalize() as usize
}
1 change: 1 addition & 0 deletions storage-node/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod hash;
1 change: 1 addition & 0 deletions storage-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::new_without_default)]

pub mod cache;
pub mod common;
pub mod disk;
pub mod error;
pub mod server;
Expand Down
25 changes: 20 additions & 5 deletions storage-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,32 @@ use crate::{
};

const CACHE_BASE_PATH: &str = "cache/";
const DATA_STORE_CACHE_NUMBER: usize = 6;

pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> {
// Should at least be able to store one 100MB file in the cache.
let dummy_size = 100 * 1024 * 1024;
// TODO: Read the type of the cache from config.
let cache = LruReplacer::new(dummy_size);
// TODO: cache_base_path should be from config
let data_store_cache = MemDiskStoreCache::new(cache, CACHE_BASE_PATH.to_string(), None, None);
let dummy_size_per_disk_cache = 100 * 1024 * 1024;
let dummy_size_per_mem_cache = 100 * 1024;
let dummy_mem_max_file_cache = 10 * 1024;

let mut data_store_caches = Vec::new();

for i in 0..DATA_STORE_CACHE_NUMBER {
let disk_replacer = LruReplacer::new(dummy_size_per_disk_cache);
let mem_replace = LruReplacer::new(dummy_size_per_mem_cache);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + CACHE_BASE_PATH,
Some(mem_replace),
Some(dummy_mem_max_file_cache),
);
data_store_caches.push(data_store_cache);
}

let is_mem_disk_cache = true;
// TODO: try to use more fine-grained lock instead of locking the whole storage_manager
let storage_manager = Arc::new(StorageManager::new(data_store_cache));
let storage_manager = Arc::new(StorageManager::new(data_store_caches));

let route = warp::path!("file")
.and(warp::path::end())
Expand Down
184 changes: 171 additions & 13 deletions storage-node/src/storage_manager.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{
cache::data_store_cache::DataStoreCache,
common::hash::calculate_hash_crc32fast,
error::ParpulseResult,
storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader},
};

use bytes::Bytes;
use log::debug;
use storage_common::RequestParams;
use tokio::sync::mpsc::Receiver;

Expand All @@ -16,12 +18,18 @@ use tokio::sync::mpsc::Receiver;

pub struct StorageManager<C: DataStoreCache> {
/// We don't use lock here because `data_store_cache` itself should handle the concurrency.
data_store_cache: C,
data_store_caches: Vec<C>,
}

// fn calculate_hash<T: Hash>(t: &T) -> u64 {
// let mut s = DefaultHasher::new();
// t.hash(&mut s);
// s.finish()
// }

impl<C: DataStoreCache> StorageManager<C> {
pub fn new(data_store_cache: C) -> Self {
Self { data_store_cache }
pub fn new(data_store_caches: Vec<C>) -> Self {
Self { data_store_caches }
}

pub async fn get_data(
Expand All @@ -43,8 +51,16 @@ impl<C: DataStoreCache> StorageManager<C> {
// FIXME: Cache key should be <bucket + key>. Might refactor the underlying S3
// reader as one S3 key for one reader.
let cache_key = format!("{}-{}", bucket, keys.join(","));
let data_rx = self
.data_store_cache
let hash = calculate_hash_crc32fast(cache_key.as_bytes());
let cache_index = hash % self.data_store_caches.len();
let data_store_cache = self.data_store_caches.get(cache_index).unwrap();

debug!(
"For cache key: {}, the corresponding data_store_cache index {}",
cache_key, cache_index
);

let data_rx = data_store_cache
.get_data_from_cache(cache_key.clone())
.await?;
if let Some(data_rx) = data_rx {
Expand All @@ -67,12 +83,11 @@ impl<C: DataStoreCache> StorageManager<C> {
};
(reader.into_stream().await?, data_size)
};
self.data_store_cache
data_store_cache
.put_data_to_cache(cache_key.clone(), data_size, stream)
.await?;
// TODO (kunle): Push down the response writer rather than calling get_data_from_cache again.
let data_rx = self
.data_store_cache
let data_rx = data_store_cache
.get_data_from_cache(cache_key.clone())
.await?;
if data_rx.is_none() {
Expand Down Expand Up @@ -123,7 +138,7 @@ mod tests {

let data_store_cache =
MemDiskStoreCache::new(cache, cache_base_path.display().to_string(), None, None);
let storage_manager = StorageManager::new(data_store_cache);
let storage_manager = StorageManager::new(vec![data_store_cache]);

let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
Expand Down Expand Up @@ -179,7 +194,7 @@ mod tests {
Some(mem_cache),
Some(950),
);
let storage_manager = StorageManager::new(data_store_cache);
let storage_manager = StorageManager::new(vec![data_store_cache]);

let request_path_small_bucket = "tests-text".to_string();
let request_path_small_keys = vec!["what-can-i-hold-you-with".to_string()];
Expand Down Expand Up @@ -238,7 +253,7 @@ mod tests {
Some(mem_cache),
Some(120000),
);
let storage_manager = StorageManager::new(data_store_cache);
let storage_manager = StorageManager::new(vec![data_store_cache]);

let request_path_bucket1 = "tests-parquet".to_string();
let request_path_keys1 = vec!["userdata1.parquet".to_string()];
Expand Down Expand Up @@ -289,7 +304,7 @@ mod tests {
None,
None,
);
let storage_manager = Arc::new(StorageManager::new(data_store_cache));
let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache]));

let request_path_bucket1 = "tests-parquet".to_string();
let request_path_keys1 = vec!["userdata1.parquet".to_string()];
Expand Down Expand Up @@ -349,7 +364,7 @@ mod tests {
Some(mem_cache),
Some(120000),
);
let storage_manager = Arc::new(StorageManager::new(data_store_cache));
let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache]));

let request_path_bucket1 = "tests-parquet".to_string();
let request_path_keys1 = vec!["userdata2.parquet".to_string()];
Expand Down Expand Up @@ -459,4 +474,147 @@ mod tests {
);
assert!(delta_time_miss > delta_time_hit);
}

#[tokio::test]
async fn test_fanout_cache() {
let data_store_cache_num = 6;
let mut data_store_caches = Vec::new();
for _ in 0..data_store_cache_num {
let disk_cache = LruReplacer::new(1000000);
let mem_cache = LruReplacer::new(120000);

let tmp = tempfile::tempdir().unwrap();
let disk_cache_base_path = tmp.path().to_owned();

let data_store_cache = MemDiskStoreCache::new(
disk_cache,
disk_cache_base_path.display().to_string(),
Some(mem_cache),
Some(120000),
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManager::new(data_store_caches));

let request_path_bucket1 = "tests-parquet".to_string();
let request_path_keys1 = vec!["userdata1.parquet".to_string()];
let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1));

let result = storage_manager.get_data(request_data1.clone(), true).await;
assert!(result.is_ok());
assert_eq!(consume_receiver(result.unwrap()).await, 113629);
let request_path_bucket2 = "tests-parquet".to_string();
let request_path_keys2 = vec!["userdata2.parquet".to_string()];
let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2));
let result = storage_manager.get_data(request_data2.clone(), true).await;
assert!(result.is_ok());
assert_eq!(consume_receiver(result.unwrap()).await, 112193);

let request_path_bucket3 = "tests-text".to_string();
let request_path_keys3: Vec<String> = vec!["what-can-i-hold-you-with".to_string()];
let request_data3 = RequestParams::MockS3((request_path_bucket3, request_path_keys3));
let result = storage_manager.get_data(request_data3.clone(), true).await;
assert!(result.is_ok());
assert_eq!(consume_receiver(result.unwrap()).await, 930);

let request_path_bucket4 = "tests-parquet".to_string();
let request_path_keys4: Vec<String> = vec!["small_random_data.parquet".to_string()];
let request_data4 = RequestParams::MockS3((request_path_bucket4, request_path_keys4));
let result = storage_manager.get_data(request_data4.clone(), true).await;
assert!(result.is_ok());
assert_eq!(consume_receiver(result.unwrap()).await, 2013);
}

#[tokio::test]
async fn test_fanout_cach_parallel() {
let data_store_cache_num = 6;
let mut data_store_caches = Vec::new();
for _ in 0..data_store_cache_num {
let disk_cache = LruReplacer::new(1000000);
let mem_cache = LruReplacer::new(120000);

let tmp = tempfile::tempdir().unwrap();
let disk_cache_base_path = tmp.path().to_owned();

let data_store_cache = MemDiskStoreCache::new(
disk_cache,
disk_cache_base_path.display().to_string(),
Some(mem_cache),
Some(120000),
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManager::new(data_store_caches));

let request_path_bucket1 = "tests-parquet".to_string();
let request_path_keys1 = vec!["userdata2.parquet".to_string()];
let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1));

let request_path_bucket2 = "tests-parquet".to_string();
let request_path_keys2 = vec!["userdata1.parquet".to_string()];
let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2));

let request_path_bucket3 = "tests-text".to_string();
let request_path_keys3 = vec!["what-can-i-hold-you-with".to_string()];
let request_data3 = RequestParams::MockS3((request_path_bucket3, request_path_keys3));

let storage_manager_1 = storage_manager.clone();
let request_data1_1 = request_data1.clone();
let get_data_fut_1 =
tokio::spawn(async move { storage_manager_1.get_data(request_data1_1, true).await });

let storage_manager_2 = storage_manager.clone();
let request_data1_2 = request_data1.clone();
let get_data_fut_2 =
tokio::spawn(async move { storage_manager_2.get_data(request_data1_2, true).await });

let storage_manager_3 = storage_manager.clone();
let request_data3_3 = request_data3.clone();
let get_data_fut_3 =
tokio::spawn(async move { storage_manager_3.get_data(request_data3_3, true).await });

let storage_manager_4 = storage_manager.clone();
let request_data2_4 = request_data2.clone();
let get_data_fut_4 =
tokio::spawn(async move { storage_manager_4.get_data(request_data2_4, true).await });

let storage_manager_5 = storage_manager.clone();
let request_data2_5 = request_data2.clone();
let get_data_fut_5 =
tokio::spawn(async move { storage_manager_5.get_data(request_data2_5, true).await });

let storage_manager_6 = storage_manager.clone();
let request_data1_6 = request_data1.clone();
let get_data_fut_6 =
tokio::spawn(async move { storage_manager_6.get_data(request_data1_6, true).await });

let storage_manager_7 = storage_manager.clone();
let request_data3_7 = request_data3.clone();
let get_data_fut_7 =
tokio::spawn(async move { storage_manager_7.get_data(request_data3_7, true).await });

let result = join!(
get_data_fut_1,
get_data_fut_2,
get_data_fut_3,
get_data_fut_4,
get_data_fut_5,
get_data_fut_6,
get_data_fut_7
);
assert!(result.0.is_ok());
assert_eq!(consume_receiver(result.0.unwrap().unwrap()).await, 112193);
assert!(result.1.is_ok());
assert_eq!(consume_receiver(result.1.unwrap().unwrap()).await, 112193);
assert!(result.2.is_ok());
assert_eq!(consume_receiver(result.2.unwrap().unwrap()).await, 930);
assert!(result.3.is_ok());
assert_eq!(consume_receiver(result.3.unwrap().unwrap()).await, 113629);
assert!(result.4.is_ok());
assert_eq!(consume_receiver(result.4.unwrap().unwrap()).await, 113629);
assert!(result.5.is_ok());
assert_eq!(consume_receiver(result.5.unwrap().unwrap()).await, 112193);
assert!(result.6.is_ok());
assert_eq!(consume_receiver(result.6.unwrap().unwrap()).await, 930);
}
}
Loading