Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

fix: modify s3 read_all #64

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

/// TODO(lanlou): make them configurable.
/// MAX_DISK_READER_BUFFER_SIZE is 10MB.
const MAX_DISK_READER_BUFFER_SIZE: usize = 10 * 1024 * 1024;
const MAX_DISK_READER_BUFFER_SIZE: usize = 100 * 1024 * 1024;
const DEFAULT_DISK_CHANNEL_BUFFER_SIZE: usize = 512;

/// [`DiskStore`] stores the contents of remote objects on the local disk.
Expand Down
4 changes: 2 additions & 2 deletions storage-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()>
// Should at least be able to store one 100MB file in the cache.
// TODO: Read the type of the cache from config.
let dummy_size_per_disk_cache = 100 * 1024 * 1024;
let dummy_size_per_mem_cache = 100 * 1024;
let dummy_mem_max_file_cache = 10 * 1024;
let dummy_size_per_mem_cache = 110 * 1024 * 1024;
let dummy_mem_max_file_cache = 11 * 1024 * 1024;

let mut data_store_caches = Vec::new();

Expand Down
2 changes: 1 addition & 1 deletion storage-node/src/storage_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait AsyncStorageReader {
///
/// NEVER call this method if you do not know the size of the data -- collecting
/// all data into one buffer might lead to OOM.
async fn read_all(&self) -> ParpulseResult<Bytes>;
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>>;

/// Read data from the underlying storage as a stream.
async fn into_stream(self) -> ParpulseResult<StorageReaderStream>;
Expand Down
12 changes: 6 additions & 6 deletions storage-node/src/storage_reader/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aws_sdk_s3::{
Client,
};
use aws_smithy_runtime_api::{client::result::SdkError, http::Response};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{future::BoxFuture, ready, FutureExt, Stream};

use crate::error::{ParpulseError, ParpulseResult};
Expand Down Expand Up @@ -140,8 +140,8 @@ impl Stream for S3ReaderStream {
impl AsyncStorageReader for S3Reader {
/// NEVER call this method if you do not know the size of the data -- collecting
/// all data into one buffer might lead to OOM.
async fn read_all(&self) -> ParpulseResult<Bytes> {
let mut bytes = BytesMut::new();
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>> {
let mut bytes_vec = Vec::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut bytes_vec = Vec::new();
let mut bytes_vec = Vec::with_capacity(self.keys.len());

for key in &self.keys {
let object = self
.client
Expand All @@ -151,7 +151,7 @@ impl AsyncStorageReader for S3Reader {
.send()
.await
.map_err(ParpulseError::from)?;
bytes.extend(
bytes_vec.push(
object
.body
.collect()
Expand All @@ -160,7 +160,7 @@ impl AsyncStorageReader for S3Reader {
.into_bytes(),
);
}
Ok(bytes.freeze())
Ok(bytes_vec)
}

async fn into_stream(self) -> ParpulseResult<StorageReaderStream> {
Expand All @@ -181,7 +181,7 @@ mod tests {
let keys = vec!["userdata/userdata1.parquet".to_string()];
let reader = S3Reader::new(bucket, keys).await;
let bytes = reader.read_all().await.unwrap();
assert_eq!(bytes.len(), 113629);
assert_eq!(bytes[0].len(), 113629);
}

#[tokio::test]
Expand Down
12 changes: 6 additions & 6 deletions storage-node/src/storage_reader/s3_diskmock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{ready, Stream, StreamExt};

use crate::{
Expand Down Expand Up @@ -119,13 +119,13 @@ impl Stream for MockS3ReaderStream {

#[async_trait]
impl AsyncStorageReader for MockS3Reader {
async fn read_all(&self) -> ParpulseResult<Bytes> {
let mut bytes = BytesMut::new();
async fn read_all(&self) -> ParpulseResult<Vec<Bytes>> {
let mut bytes_vec = Vec::new();
for file_path in &self.file_paths {
let (_, data) = self.disk_manager.read_disk_all(file_path).await?;
bytes.extend(data);
bytes_vec.push(data);
}
Ok(bytes.freeze())
Ok(bytes_vec)
}

async fn into_stream(self) -> ParpulseResult<StorageReaderStream> {
Expand All @@ -145,7 +145,7 @@ mod tests {
];
let reader = MockS3Reader::new(bucket, keys).await;
let bytes = reader.read_all().await.unwrap();
assert_eq!(bytes.len(), 113629 + 112193);
assert_eq!(bytes[0].len() + bytes[1].len(), 113629 + 112193);
}

#[tokio::test]
Expand Down
Loading