Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
fix: modify s3 read_all (#64)
Browse files Browse the repository at this point in the history
* fix: speed up s3 read all

* configuration: modify memory size

* fix: s3 readall tests

* apply comment suggestions
  • Loading branch information
lanlou1554 authored Apr 30, 2024
1 parent c9cc2c1 commit a914ac8
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

/// TODO(lanlou): make them configurable.
/// MAX_DISK_READER_BUFFER_SIZE is 10MB.
const MAX_DISK_READER_BUFFER_SIZE: usize = 10 * 1024 * 1024;
const MAX_DISK_READER_BUFFER_SIZE: usize = 100 * 1024 * 1024;
const DEFAULT_DISK_CHANNEL_BUFFER_SIZE: usize = 512;

/// [`DiskStore`] stores the contents of remote objects on the local disk.
Expand Down
2 changes: 1 addition & 1 deletion storage-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const DATA_STORE_CACHE_NUMBER: usize = 3;
pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> {
// Should at least be able to store one 100MB file in the cache.
// TODO: Read the type of the cache from config.
let dummy_size_per_disk_cache = 1000 * 1024 * 1024;
let dummy_size_per_disk_cache = 1024 * 1024 * 1024;
let dummy_size_per_mem_cache = 200 * 1024 * 1024;
let dummy_mem_max_file_cache = 10 * 1024 * 1024 + 1;

Expand Down
2 changes: 1 addition & 1 deletion storage-node/src/storage_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait AsyncStorageReader {
///
/// NEVER call this method if you do not know the size of the data -- collecting
/// all data into one buffer might lead to OOM.
async fn read_all(&self) -> ParpulseResult<Bytes>;
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>>;

/// Read data from the underlying storage as a stream.
async fn into_stream(self) -> ParpulseResult<StorageReaderStream>;
Expand Down
12 changes: 6 additions & 6 deletions storage-node/src/storage_reader/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aws_sdk_s3::{
Client,
};
use aws_smithy_runtime_api::{client::result::SdkError, http::Response};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{future::BoxFuture, ready, FutureExt, Stream};

use crate::error::{ParpulseError, ParpulseResult};
Expand Down Expand Up @@ -140,8 +140,8 @@ impl Stream for S3ReaderStream {
impl AsyncStorageReader for S3Reader {
/// NEVER call this method if you do not know the size of the data -- collecting
/// all data into one buffer might lead to OOM.
async fn read_all(&self) -> ParpulseResult<Bytes> {
let mut bytes = BytesMut::new();
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>> {
let mut bytes_vec = Vec::with_capacity(self.keys.len());
for key in &self.keys {
let object = self
.client
Expand All @@ -151,7 +151,7 @@ impl AsyncStorageReader for S3Reader {
.send()
.await
.map_err(ParpulseError::from)?;
bytes.extend(
bytes_vec.push(
object
.body
.collect()
Expand All @@ -160,7 +160,7 @@ impl AsyncStorageReader for S3Reader {
.into_bytes(),
);
}
Ok(bytes.freeze())
Ok(bytes_vec)
}

async fn into_stream(self) -> ParpulseResult<StorageReaderStream> {
Expand All @@ -181,7 +181,7 @@ mod tests {
let keys = vec!["userdata/userdata1.parquet".to_string()];
let reader = S3Reader::new(bucket, keys).await;
let bytes = reader.read_all().await.unwrap();
assert_eq!(bytes.len(), 113629);
assert_eq!(bytes[0].len(), 113629);
}

#[tokio::test]
Expand Down
12 changes: 6 additions & 6 deletions storage-node/src/storage_reader/s3_diskmock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{ready, Stream, StreamExt};

use crate::{
Expand Down Expand Up @@ -119,13 +119,13 @@ impl Stream for MockS3ReaderStream {

#[async_trait]
impl AsyncStorageReader for MockS3Reader {
async fn read_all(&self) -> ParpulseResult<Bytes> {
let mut bytes = BytesMut::new();
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>> {
let mut bytes_vec = Vec::with_capacity(self.file_paths.len());
for file_path in &self.file_paths {
let (_, data) = self.disk_manager.read_disk_all(file_path).await?;
bytes.extend(data);
bytes_vec.push(data);
}
Ok(bytes.freeze())
Ok(bytes_vec)
}

async fn into_stream(self) -> ParpulseResult<StorageReaderStream> {
Expand All @@ -145,7 +145,7 @@ mod tests {
];
let reader = MockS3Reader::new(bucket, keys).await;
let bytes = reader.read_all().await.unwrap();
assert_eq!(bytes.len(), 113629 + 112193);
assert_eq!(bytes[0].len() + bytes[1].len(), 113629 + 112193);
}

#[tokio::test]
Expand Down

0 comments on commit a914ac8

Please sign in to comment.