Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

refactor: put request param to data store cache #68

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 65 additions & 145 deletions storage-node/src/cache/data_store_cache/memdisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc};

use futures::stream::StreamExt;
use log::{debug, warn};
use parpulse_client::RequestParams;
use tokio::sync::{Mutex, Notify, RwLock};

use crate::{
Expand All @@ -13,7 +14,7 @@ use crate::{
},
disk::disk_manager::DiskManager,
error::{ParpulseError, ParpulseResult},
storage_reader::StorageReaderStream,
storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader},
};

use async_trait::async_trait;
Expand All @@ -22,6 +23,8 @@ use tokio::sync::mpsc::Receiver;

use self::data_store::{disk::DiskStore, memory::MemStore};

use super::cache_key_from_request;

/// The default maximum single file size for the memory cache.
/// If the file size exceeds this value, the file will be stored in the disk cache.
/// TODO(lanlou): make this value configurable.
Expand Down Expand Up @@ -216,8 +219,9 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D
{
async fn get_data_from_cache(
&self,
remote_location: String,
request: &RequestParams,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
let remote_location = cache_key_from_request(request);
// TODO: Refine the lock.
if let Some(mem_store) = &self.mem_store {
let mut data_store_key = None;
Expand Down Expand Up @@ -281,12 +285,8 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D
}
}

async fn put_data_to_cache(
&self,
remote_location: String,
_data_size: Option<usize>,
mut data_stream: StorageReaderStream,
) -> ParpulseResult<usize> {
async fn put_data_to_cache(&self, request: &RequestParams) -> ParpulseResult<usize> {
let remote_location = cache_key_from_request(request);
// If in_progress of remote_location thread fails, it will clean the data from this hash map.
{
let status_of_keys = self.status_of_keys.read().await;
Expand Down Expand Up @@ -361,6 +361,19 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D
remote_location
);

let (mut data_stream, _) = {
match request {
RequestParams::S3((bucket, keys)) => {
let reader = S3Reader::new(bucket.clone(), keys.clone().to_vec()).await;
(reader.into_stream().await?, 0)
}
RequestParams::MockS3((bucket, keys)) => {
let reader = MockS3Reader::new(bucket.clone(), keys.clone().to_vec()).await;
(reader.into_stream().await?, 0)
}
}
};

// TODO: Refine the lock.
// TODO(lanlou): Also write the data to network.
let mut bytes_to_disk = None;
Expand Down Expand Up @@ -585,11 +598,9 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D

#[cfg(test)]
mod tests {
use crate::{
cache::replacer::lru::LruReplacer,
storage_reader::{s3_diskmock::MockS3Reader, AsyncStorageReader},
};
use crate::cache::replacer::lru::LruReplacer;
use futures::join;
use parpulse_client::RequestParams;

use super::*;
#[tokio::test]
Expand All @@ -606,43 +617,35 @@ mod tests {
let cache = MemDiskStoreCache::new(
LruReplacer::new(1024 * 512),
disk_base_path.to_str().unwrap().to_string(),
Some(LruReplacer::new(950)),
Some(LruReplacer::new(120000)),
None,
);
let bucket = "tests-text".to_string();
let keys = vec!["what-can-i-hold-you-with".to_string()];
let data1_key = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let data_stream = reader.into_stream().await.unwrap();
let written_data_size = cache
.put_data_to_cache(data1_key.clone(), None, data_stream)
.await
.unwrap();
assert_eq!(written_data_size, 930);
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
let request = RequestParams::MockS3((bucket.clone(), keys));
let written_data_size = cache.put_data_to_cache(&request).await.unwrap();
assert_eq!(written_data_size, 113629);

let data2_key = bucket.clone() + &keys[0] + "1";
let reader = MockS3Reader::new(bucket.clone(), keys).await;
let mut written_data_size = cache
.put_data_to_cache(data2_key.clone(), None, reader.into_stream().await.unwrap())
.await
.unwrap();
assert_eq!(written_data_size, 930);
let keys = vec!["userdata2.parquet".to_string()];
let request2 = RequestParams::MockS3((bucket.clone(), keys));
let mut written_data_size = cache.put_data_to_cache(&request2).await.unwrap();
assert_eq!(written_data_size, 112193);

written_data_size = 0;
let mut rx = cache.get_data_from_cache(data1_key).await.unwrap().unwrap();
let mut rx = cache.get_data_from_cache(&request).await.unwrap().unwrap();
while let Some(data) = rx.recv().await {
let data = data.unwrap();
written_data_size += data.len();
}
assert_eq!(written_data_size, 930);
assert_eq!(written_data_size, 113629);

written_data_size = 0;
let mut rx = cache.get_data_from_cache(data2_key).await.unwrap().unwrap();
let mut rx = cache.get_data_from_cache(&request2).await.unwrap().unwrap();
while let Some(data) = rx.recv().await {
let data = data.unwrap();
written_data_size += data.len();
}
assert_eq!(written_data_size, 930);
assert_eq!(written_data_size, 112193);
}

#[tokio::test]
Expand All @@ -657,36 +660,16 @@ mod tests {
);
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
let remote_location = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket.clone(), keys).await;
let written_data_size = cache
.put_data_to_cache(
remote_location.clone(),
None,
reader.into_stream().await.unwrap(),
)
.await
.unwrap();
let request = RequestParams::MockS3((bucket.clone(), keys));
let written_data_size = cache.put_data_to_cache(&request).await.unwrap();
assert_eq!(written_data_size, 113629);

let keys = vec!["userdata2.parquet".to_string()];
let remote_location2 = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket, keys).await;
let mut written_data_size = cache
.put_data_to_cache(
remote_location2.clone(),
None,
reader.into_stream().await.unwrap(),
)
.await
.unwrap();
let request2 = RequestParams::MockS3((bucket, keys));
let mut written_data_size = cache.put_data_to_cache(&request2).await.unwrap();
assert_eq!(written_data_size, 112193);

let mut rx = cache
.get_data_from_cache(remote_location)
.await
.unwrap()
.unwrap();
let mut rx = cache.get_data_from_cache(&request).await.unwrap().unwrap();
written_data_size = 0;
while let Some(data) = rx.recv().await {
let data = data.unwrap();
Expand All @@ -711,34 +694,26 @@ mod tests {
);
let bucket = "tests-text".to_string();
let keys = vec!["what-can-i-hold-you-with".to_string()];
let data1_key = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let written_data_size = cache
.put_data_to_cache(data1_key.clone(), None, reader.into_stream().await.unwrap())
.await
.unwrap();
let request = RequestParams::MockS3((bucket, keys));
let written_data_size = cache.put_data_to_cache(&request).await.unwrap();
assert_eq!(written_data_size, 930);

let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
let data2_key = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket.clone(), keys).await;
let mut written_data_size = cache
.put_data_to_cache(data2_key.clone(), None, reader.into_stream().await.unwrap())
.await
.unwrap();
let request2 = RequestParams::MockS3((bucket, keys));
let mut written_data_size = cache.put_data_to_cache(&request2).await.unwrap();
assert_eq!(written_data_size, 113629);

written_data_size = 0;
let mut rx = cache.get_data_from_cache(data1_key).await.unwrap().unwrap();
let mut rx = cache.get_data_from_cache(&request).await.unwrap().unwrap();
while let Some(data) = rx.recv().await {
let data = data.unwrap();
written_data_size += data.len();
}
assert_eq!(written_data_size, 930);

written_data_size = 0;
let mut rx = cache.get_data_from_cache(data2_key).await.unwrap().unwrap();
let mut rx = cache.get_data_from_cache(&request2).await.unwrap().unwrap();
while let Some(data) = rx.recv().await {
let data = data.unwrap();
written_data_size += data.len();
Expand All @@ -758,42 +733,19 @@ mod tests {
));
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata2.parquet".to_string()];
let remote_location = bucket.clone() + &keys[0];
let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let reader3 = MockS3Reader::new(bucket.clone(), keys).await;
let request1 = RequestParams::MockS3((bucket, keys));

let cache_1 = cache.clone();
let remote_location_1 = remote_location.clone();
let put_data_fut_1 = tokio::spawn(async move {
cache_1
.put_data_to_cache(remote_location_1, None, reader.into_stream().await.unwrap())
.await
});
let request = request1.clone();
let put_data_fut_1 = tokio::spawn(async move { cache_1.put_data_to_cache(&request).await });

let cache_2 = cache.clone();
let remote_location_2 = remote_location.clone();
let put_data_fut_2 = tokio::spawn(async move {
cache_2
.put_data_to_cache(
remote_location_2,
None,
reader2.into_stream().await.unwrap(),
)
.await
});
let request = request1.clone();
let put_data_fut_2 = tokio::spawn(async move { cache_2.put_data_to_cache(&request).await });

let cache_3 = cache.clone();
let remote_location_3 = remote_location.clone();
let put_data_fut_3 = tokio::spawn(async move {
cache_3
.put_data_to_cache(
remote_location_3,
None,
reader3.into_stream().await.unwrap(),
)
.await
});
let request = request1.clone();
let put_data_fut_3 = tokio::spawn(async move { cache_3.put_data_to_cache(&request).await });

let res = join!(put_data_fut_1, put_data_fut_2, put_data_fut_3);
assert!(res.0.is_ok());
Expand All @@ -817,56 +769,24 @@ mod tests {
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
let keys2 = vec!["userdata2.parquet".to_string()];
let remote_location = bucket.clone() + &keys[0];
let remote_location2 = bucket.clone() + &keys2[0];
let reader = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let reader2 = MockS3Reader::new(bucket.clone(), keys.clone()).await;
let reader3 = MockS3Reader::new(bucket.clone(), keys2).await;
let reader4 = MockS3Reader::new(bucket.clone(), keys).await;
let request1 = RequestParams::MockS3((bucket.clone(), keys));
let request2 = RequestParams::MockS3((bucket, keys2));

let cache_1 = cache.clone();
let remote_location_1 = remote_location.clone();
let put_data_fut_1 = tokio::spawn(async move {
cache_1
.put_data_to_cache(remote_location_1, None, reader.into_stream().await.unwrap())
.await
});
let request = request1.clone();
let put_data_fut_1 = tokio::spawn(async move { cache_1.put_data_to_cache(&request).await });

let cache_2 = cache.clone();
let remote_location_2 = remote_location.clone();
let put_data_fut_2 = tokio::spawn(async move {
cache_2
.put_data_to_cache(
remote_location_2,
None,
reader2.into_stream().await.unwrap(),
)
.await
});
let request = request1.clone();
let put_data_fut_2 = tokio::spawn(async move { cache_2.put_data_to_cache(&request).await });

let cache_3 = cache.clone();
let remote_location_3 = remote_location2.clone();
let put_data_fut_3 = tokio::spawn(async move {
cache_3
.put_data_to_cache(
remote_location_3,
None,
reader3.into_stream().await.unwrap(),
)
.await
});
let request = request2.clone();
let put_data_fut_3 = tokio::spawn(async move { cache_3.put_data_to_cache(&request).await });

let cache_4 = cache.clone();
let remote_location_4 = remote_location.clone();
let put_data_fut_4 = tokio::spawn(async move {
cache_4
.put_data_to_cache(
remote_location_4,
None,
reader4.into_stream().await.unwrap(),
)
.await
});
let request = request1.clone();
let put_data_fut_4 = tokio::spawn(async move { cache_4.put_data_to_cache(&request).await });

let res = join!(
put_data_fut_1,
Expand Down
23 changes: 15 additions & 8 deletions storage-node/src/cache/data_store_cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use async_trait::async_trait;
use bytes::Bytes;
use parpulse_client::RequestParams;
use tokio::sync::mpsc::Receiver;

use crate::{error::ParpulseResult, storage_reader::StorageReaderStream};
use crate::error::ParpulseResult;

pub mod memdisk;
pub mod sqlite;
Expand All @@ -11,17 +12,23 @@ pub mod sqlite;
pub trait DataStoreCache {
async fn get_data_from_cache(
&self,
remote_location: String,
request_param: &RequestParams,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>>;

/// Put data to cache. Accepts a stream of bytes and returns the number of bytes written.
/// The data_size parameter is optional and can be used to hint the cache about the size of the data.
/// If the data_size is not provided, the cache implementation should try to determine the size of
/// the data.
async fn put_data_to_cache(
&self,
remote_location: String,
data_size: Option<usize>,
data_stream: StorageReaderStream,
) -> ParpulseResult<usize>;
async fn put_data_to_cache(&self, request_param: &RequestParams) -> ParpulseResult<usize>;
}

pub fn cache_key_from_request(request_param: &RequestParams) -> String {
match request_param {
RequestParams::S3((bucket, keys)) => {
format!("{}-{}", bucket, keys.join(","))
}
RequestParams::MockS3((bucket, keys)) => {
format!("{}-{}", bucket, keys.join(","))
}
}
}
Loading
Loading