Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: connect storage manager with the server #19

Merged
merged 7 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion storage-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ use enum_as_inner::EnumAsInner;
#[derive(Clone, EnumAsInner, Debug)]
pub enum RequestParams {
File(String),
S3(String),
/// S3 bucket and keys.
S3((String, Vec<String>)),
}
1 change: 1 addition & 0 deletions storage-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ tokio-util = "0.7"
reqwest = "0.12"
tempfile = "3.10.1"
rand = "0.8"
tokio-stream = "0.1"
4 changes: 1 addition & 3 deletions storage-node/src/bin/storage_node.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use storage_common::RequestParams;
use storage_node::server::storage_node_serve;

// TODO: Add config here.

#[tokio::main]
async fn main() {
println!("starting storage node server...");
let file_dir = RequestParams::File("storage-node/tests/parquet".to_string());
storage_node_serve(file_dir).await.unwrap();
storage_node_serve().await.unwrap();
}
8 changes: 6 additions & 2 deletions storage-node/src/cache/data/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ impl DataStore for DiskStore {
Ok(Some(rx))
}

async fn write_data(&self, key: String, data: StorageReaderStream) -> ParpulseResult<usize> {
async fn write_data(
&self,
key: String,
data_stream: StorageReaderStream,
) -> ParpulseResult<usize> {
// NOTE(Yuanxin): Shall we spawn a task to write the data to disk?
let bytes_written = self
.disk_manager
.write_stream_reader_to_disk(data, &key)
.write_stream_reader_to_disk(data_stream, &key)
.await?;
Ok(bytes_written)
}
Expand Down
64 changes: 23 additions & 41 deletions storage-node/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::path::Path;
use storage_common::RequestParams;
use tokio::fs::File as AsyncFile;
use tokio_util::io::ReaderStream;
use tokio_stream::wrappers::ReceiverStream;
use warp::{Filter, Rejection};

use crate::{
Expand All @@ -11,51 +9,34 @@ use crate::{
storage_manager::StorageManager,
};

pub async fn storage_node_serve(file_dir: RequestParams) -> ParpulseResult<()> {
pub async fn storage_node_serve() -> ParpulseResult<()> {
// TODO: Read the type of the cache from config.
let dummy_size = 10;
let cache = LruCache::new(dummy_size);
let disk_manager = DiskManager::default();
// TODO: cache_base_path should be from config
let data_store = DiskStore::new(disk_manager, "cache/".to_string());
let _storage_manager = StorageManager::new(cache, data_store);
let dummy_size = 1000000;

// FIXME (kunle): We need to get the file from storage manager. For now we directly read
// the file from disk. I will update it after the storage manager provides the relevant API.
let route = warp::path!("file" / String)
.and(warp::path::end())
.and_then(move |file_name: String| {
// Have to clone file_dir to move it into the closure
let file_dir = file_dir.clone();
async move {
let file_path = match file_dir {
RequestParams::File(dir) => format!("{}/{}", dir, file_name),
_ => {
return Err(warp::reject::not_found());
}
};

if !Path::new(&file_path).exists() {
return Err(warp::reject::not_found());
}

println!("File Path in the server: {}", file_path);
let file = match AsyncFile::open(&file_path).await {
Ok(file) => file,
Err(e) => {
eprintln!("Error opening file: {}", e);
return Err(warp::reject::not_found());
}
};

let stream = ReaderStream::new(file);
let cache = LruCache::new(dummy_size);
let disk_manager = DiskManager::default();
// TODO: cache_base_path should be from config
let data_store = DiskStore::new(disk_manager, "cache/".to_string());
let storage_manager = StorageManager::new(cache, data_store);
println!("File Name: {}", file_name);
let bucket = "tests-parquet".to_string();
let keys = vec![file_name];
let request = RequestParams::S3((bucket, keys));
let result = storage_manager.get_data(request).await;
let data_rx = result.unwrap();

let stream = ReceiverStream::new(data_rx);
let body = warp::hyper::Body::wrap_stream(stream);
let response = warp::http::Response::builder()
.header("Content-Type", "text/plain")
.body(body)
.unwrap();

// Return the file content as response
Ok::<_, Rejection>(warp::reply::with_status(
response,
warp::http::StatusCode::OK,
Expand All @@ -79,11 +60,11 @@ mod tests {
/// WARNING: Put userdata1.parquet in the storage-node/tests/parquet directory before running this test.
#[tokio::test]
async fn test_download_file() {
let file_dir = RequestParams::File("tests/parquet".to_string());
let original_file_path = "tests/parquet/userdata1.parquet";

// Start the server
let server_handle = tokio::spawn(async move {
storage_node_serve(file_dir).await.unwrap();
storage_node_serve().await.unwrap();
});

// Give the server some time to start
Expand Down Expand Up @@ -112,10 +93,11 @@ mod tests {
}
assert!(file_path.exists(), "File not found after download");

let original_file_path = "tests/parquet/userdata1.parquet";
let original_file = fs::read(original_file_path).unwrap();
let downloaded_file = fs::read(file_path).unwrap();
assert_eq!(original_file, downloaded_file);
// Check if file sizes are equal
assert_eq!(
fs::metadata(original_file_path).unwrap().len(),
fs::metadata(file_path).unwrap().len()
);

server_handle.abort();
}
Expand Down
76 changes: 43 additions & 33 deletions storage-node/src/storage_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,32 @@ impl<C: DataStoreCache, DS: DataStore> StorageManager<C, DS> {
}
}

fn dummy_s3_request(&self) -> (String, Vec<String>) {
let bucket = "tests-parquet".to_string();
let keys = vec![
"userdata1.parquet".to_string(),
"userdata2.parquet".to_string(),
];
(bucket, keys)
}

pub async fn get_data(&self, _request: RequestParams) -> ParpulseResult<usize> {
pub async fn get_data(
&self,
_request: RequestParams,
) -> ParpulseResult<Receiver<ParpulseResult<Bytes>>> {
// 1. Try to get data from the cache first.
// 2. If cache miss, then go to storage reader to fetch the data from
// the underlying storage.
// 3. If needed, update the cache with the data fetched from the storage reader.
// TODO: Support more request types.
let (bucket, keys) = self.dummy_s3_request();
let (bucket, keys) = match _request {
RequestParams::S3((bucket, keys)) => (bucket, keys),
_ => unreachable!(),
};

// FIXME: Cache key should be <bucket + key>. Might refactor the underlying S3
// reader as one S3 key for one reader.
let data_rx = self.get_data_from_cache(bucket.clone()).await?;
if let Some(mut data_rx) = data_rx {
let mut data_size: usize = 0;
while let Some(data) = data_rx.recv().await {
match data {
Ok(bytes) => {
// TODO: Put the data into network. May need to push down the response
// stream to data store.
data_size += bytes.len();
}
Err(e) => return Err(e),
}
}
Ok(data_size)
if let Some(data_rx) = data_rx {
Ok(data_rx)
} else {
// Get data from the underlying storage and put the data into the cache.
let reader = MockS3Reader::new(bucket.clone(), keys).await;
let data_size = self
.put_data_to_cache(bucket, reader.into_stream().await?)
.await?;
Ok(data_size)
let stream = reader.into_stream().await?;
self.put_data_to_cache(bucket.clone(), stream).await?;
// TODO (kunle): Push down the response writer rather than calling get_data_from_cache again.
let data_rx = self.get_data_from_cache(bucket.clone()).await?.unwrap();
Ok(data_rx)
}
}

Expand Down Expand Up @@ -144,19 +130,43 @@ mod tests {
);
let storage_manager = StorageManager::new(cache, data_store);

let request_path = "dummy_s3_request";
let request = RequestParams::S3(request_path.to_string());
let bucket = "tests-parquet".to_string();
let keys = vec![
"userdata1.parquet".to_string(),
"userdata2.parquet".to_string(),
];
let request = RequestParams::S3((bucket, keys));

let mut start_time = Instant::now();
let result = storage_manager.get_data(request.clone()).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 113629 + 112193);
let mut data_rx = result.unwrap();
let mut total_bytes = 0;
while let Some(data) = data_rx.recv().await {
match data {
Ok(bytes) => {
total_bytes += bytes.len();
}
Err(e) => panic!("Error receiving data: {:?}", e),
}
}
assert_eq!(total_bytes, 113629 + 112193);
let delta_time_miss = Instant::now() - start_time;

start_time = Instant::now();
let result = storage_manager.get_data(request).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 113629 + 112193);
let mut data_rx = result.unwrap();
total_bytes = 0;
while let Some(data) = data_rx.recv().await {
match data {
Ok(bytes) => {
total_bytes += bytes.len();
}
Err(e) => panic!("Error receiving data: {:?}", e),
}
}
assert_eq!(total_bytes, 113629 + 112193);
let delta_time_hit = Instant::now() - start_time;

println!(
Expand Down
Binary file not shown.
6 changes: 3 additions & 3 deletions tests/src/client_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ mod tests {
use arrow::array::StringArray;
use storage_client::client::StorageClientImpl;
use storage_client::{StorageClient, StorageRequest};
use storage_common::RequestParams;
use storage_node::server::storage_node_serve;

#[tokio::test]
#[ignore = "Need to discuss how to set S3 params"]
async fn test_client_server() {
let file_dir = RequestParams::File("../storage-node/tests/parquet".to_string());
// The file dir should start from storage-node.

// Start the server
let server_handle = tokio::spawn(async move {
storage_node_serve(file_dir).await.unwrap();
storage_node_serve().await.unwrap();
});

// Give the server some time to start
Expand Down
Loading