Skip to content

Commit

Permalink
Merge AppendUploader into Uploader (#1172)
Browse files Browse the repository at this point in the history
Internal refactor to merge the `AppendUploader` for incremental uploads
into the existing `Uploader`.

### Does this change impact existing behavior?

No.

### Does this change need a changelog entry?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Nov 29, 2024
1 parent b041775 commit 54b57c4
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 263 deletions.
93 changes: 42 additions & 51 deletions mountpoint-s3/examples/upload_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::sync::Arc;
use std::time::Instant;

use clap::Parser;
use futures::task::Spawn;
use mountpoint_s3::mem_limiter::MemoryLimiter;
use mountpoint_s3::upload::{AppendUploader, Uploader};
use mountpoint_s3::upload::Uploader;
use mountpoint_s3::ServerSideEncryption;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ChecksumAlgorithm;
Expand Down Expand Up @@ -107,11 +106,41 @@ fn main() {
let runtime = client.event_loop_group();

for i in 0..args.iterations {
let max_memory_target = if let Some(target) = args.max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory
let sys = System::new_with_specifics(RefreshKind::everything());
(sys.total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));

let buffer_size = args.write_part_size;
let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

let checksum_algorithm = match args.checksum_algorithm.as_str() {
"off" => None,
"crc32c" => Some(ChecksumAlgorithm::Crc32c),
"crc32" => Some(ChecksumAlgorithm::Crc32),
"sha1" => Some(ChecksumAlgorithm::Sha1),
"sha256" => Some(ChecksumAlgorithm::Sha256),
other => Some(ChecksumAlgorithm::Unknown(other.to_string())),
};
let uploader = Uploader::new(
client.clone(),
runtime.clone(),
mem_limiter,
None,
server_side_encryption,
buffer_size,
checksum_algorithm,
);

let start = Instant::now();
if args.incremental_upload {
futures::executor::block_on(run_append_uploader(client.clone(), runtime.clone(), &args, i));
futures::executor::block_on(run_append_uploader(&uploader, &args, i));
} else {
futures::executor::block_on(run_mpu_uploader(client.clone(), &args, i));
futures::executor::block_on(run_mpu_uploader(&uploader, &args, i));
}
let elapsed = start.elapsed();
let uploaded_size_mib = (args.object_size as f64) / (1024 * 1024) as f64;
Expand All @@ -128,20 +157,15 @@ fn main() {
}
}

async fn run_mpu_uploader<Client: ObjectClient>(client: Arc<Client>, args: &UploadBenchmarkArgs, iteration: usize) {
async fn run_mpu_uploader<Client>(uploader: &Uploader<Client>, args: &UploadBenchmarkArgs, iteration: usize)
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
let start = Instant::now();
let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

let use_additional_checksum = match args.checksum_algorithm.as_str() {
"off" => false,
"crc32c" => true,
other => todo!("MPU uploader does not support {other} checksum algorithm"),
};
let uploader = Uploader::new(client.clone(), None, server_side_encryption, use_additional_checksum);

let bucket = args.bucket.clone();
let key = args.key.clone();
let mut upload_request = uploader.put(&bucket, &key).await.unwrap();
let mut upload_request = uploader.start_atomic_upload(&bucket, &key).await.unwrap();

let mut total_bytes_written = 0;
let target_size = args.object_size;
Expand All @@ -166,48 +190,15 @@ async fn run_mpu_uploader<Client: ObjectClient>(client: Arc<Client>, args: &Uplo
upload_request.complete().await.unwrap();
}

async fn run_append_uploader<Client, Runtime>(
client: Arc<Client>,
runtime: Runtime,
args: &UploadBenchmarkArgs,
iteration: usize,
) where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
async fn run_append_uploader<Client>(uploader: &Uploader<Client>, args: &UploadBenchmarkArgs, iteration: usize)
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
let start = Instant::now();
let max_memory_target = if let Some(target) = args.max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory
let sys = System::new_with_specifics(RefreshKind::everything());
(sys.total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));

let buffer_size = args.write_part_size;
let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

let checksum_algorithm = match args.checksum_algorithm.as_str() {
"off" => None,
"crc32c" => Some(ChecksumAlgorithm::Crc32c),
"crc32" => Some(ChecksumAlgorithm::Crc32),
"sha1" => Some(ChecksumAlgorithm::Sha1),
"sha256" => Some(ChecksumAlgorithm::Sha256),
other => Some(ChecksumAlgorithm::Unknown(other.to_string())),
};
let uploader = AppendUploader::new(
client.clone(),
runtime,
mem_limiter,
buffer_size,
server_side_encryption,
checksum_algorithm,
);

let bucket = args.bucket.clone();
let key = args.key.clone();
let mut upload_request = uploader.start_upload(bucket.clone(), key.clone(), 0, None);
let mut upload_request = uploader.start_incremental_upload(bucket.clone(), key.clone(), 0, None);

let mut total_bytes_written = 0;
let target_size = args.object_size;
Expand Down
14 changes: 4 additions & 10 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use bytes::Bytes;
use futures::task::Spawn;

use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::time::{Duration, UNIX_EPOCH};
Expand All @@ -21,7 +22,7 @@ use crate::prefix::Prefix;
use crate::superblock::{InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, SuperblockConfig};
use crate::sync::atomic::{AtomicU64, Ordering};
use crate::sync::{Arc, AsyncMutex, AsyncRwLock};
use crate::upload::{AppendUploader, Uploader};
use crate::upload::Uploader;

pub use crate::superblock::InodeNo;

Expand Down Expand Up @@ -61,7 +62,6 @@ where
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
append_uploader: AppendUploader<Client>,
bucket: String,
#[allow(unused)]
prefix: Prefix,
Expand Down Expand Up @@ -169,17 +169,12 @@ where
let superblock = Superblock::new(bucket, prefix, superblock_config);
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));
let uploader = Uploader::new(
client.clone(),
config.storage_class.to_owned(),
config.server_side_encryption.clone(),
config.use_upload_checksums,
);
let append_uploader = AppendUploader::new(
client.clone(),
runtime,
mem_limiter.clone(),
client.write_part_size().unwrap(),
config.storage_class.to_owned(),
config.server_side_encryption.clone(),
client.write_part_size().unwrap(),
config.use_upload_checksums.then_some(ChecksumAlgorithm::Crc32c),
);

Expand All @@ -190,7 +185,6 @@ where
superblock,
prefetcher,
uploader,
append_uploader,
bucket: bucket.to_string(),
prefix: prefix.clone(),
next_handle: AtomicU64::new(1),
Expand Down
47 changes: 12 additions & 35 deletions mountpoint-s3/src/fs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::Level;
use crate::fs::error_metadata::ErrorMetadata;
use crate::prefetch::PrefetchReadError;
use crate::superblock::InodeError;
use crate::upload::{AppendUploadError, UploadWriteError};
use crate::upload::UploadError;

/// Generate an error that includes a conversion to a libc errno for use in replies to FUSE.
///
Expand Down Expand Up @@ -108,30 +108,16 @@ impl From<InodeError> for Error {
}
}

impl<E: std::error::Error + Send + Sync + 'static> From<UploadWriteError<E>> for Error {
fn from(err: UploadWriteError<E>) -> Self {
impl<E: std::error::Error + Send + Sync + 'static> From<UploadError<E>> for Error {
fn from(err: UploadError<E>) -> Self {
let errno = err.to_errno();
Error {
errno,
message: String::from("upload error"),
source: Some(anyhow::anyhow!(err)),
// We are having WARN as the default level of logging for fuse errors
level: Level::WARN,
metadata: Default::default(), // TODO (vlaad): must be cloned from UploadWriteError
}
}
}

impl<E: std::error::Error + Send + Sync + 'static> From<AppendUploadError<E>> for Error {
fn from(err: AppendUploadError<E>) -> Self {
let errno = err.to_errno();
Error {
errno,
message: String::from("upload error"),
source: Some(anyhow::anyhow!(err)),
// We are having WARN as the default level of logging for fuse errors
level: Level::WARN,
metadata: Default::default(), // TODO (vlaad): must be cloned from AppendUploadError
metadata: Default::default(), // TODO (vlaad): must be cloned from UploadError
}
}
}
Expand Down Expand Up @@ -193,25 +179,16 @@ impl ToErrno for InodeError {
}
}

impl<E: std::error::Error> ToErrno for UploadWriteError<E> {
fn to_errno(&self) -> libc::c_int {
match self {
UploadWriteError::PutRequestFailed(_) => libc::EIO,
UploadWriteError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadWriteError::ObjectTooBig { .. } => libc::EFBIG,
}
}
}

impl<E: std::error::Error> ToErrno for AppendUploadError<E> {
impl<E: std::error::Error> ToErrno for UploadError<E> {
fn to_errno(&self) -> libc::c_int {
match self {
AppendUploadError::PutRequestFailed(_) => libc::EIO,
AppendUploadError::UploadAlreadyTerminated => libc::EIO,
AppendUploadError::SseCorruptedError(_) => libc::EIO,
AppendUploadError::ChecksumComputationFailed(_) => libc::EIO,
AppendUploadError::HeadObjectFailed(_) => libc::EIO,
AppendUploadError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadError::PutRequestFailed(_) => libc::EIO,
UploadError::UploadAlreadyTerminated => libc::EIO,
UploadError::SseCorruptedError(_) => libc::EIO,
UploadError::ChecksumComputationFailed(_) => libc::EIO,
UploadError::HeadObjectFailed(_) => libc::EIO,
UploadError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadError::ObjectTooBig { .. } => libc::EFBIG,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions mountpoint-s3/src/fs/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ where
lookup.stat.etag.as_ref().map(|e| e.into())
};
let current_offset = if is_truncate { 0 } else { lookup.stat.size as u64 };
let request = fs.append_uploader.start_upload(
let request = fs.uploader.start_incremental_upload(
bucket.to_owned(),
key.to_owned(),
current_offset,
Expand All @@ -120,7 +120,7 @@ where
written_bytes: 0,
})
} else {
match fs.uploader.put(bucket, key).await {
match fs.uploader.start_atomic_upload(bucket, key).await {
Err(e) => return Err(err!(libc::EIO, source:e, "put failed to start")),
Ok(request) => FileHandleState::Write(UploadState::MPUInProgress { request, handle }),
}
Expand Down Expand Up @@ -247,7 +247,7 @@ where
Ok(etag) => {
// Restart append request.
let initial_etag = etag.or(initial_etag);
let request = fs.append_uploader.start_upload(
let request = fs.uploader.start_incremental_upload(
fs.bucket.clone(),
key.to_owned(),
current_offset,
Expand Down
Loading

0 comments on commit 54b57c4

Please sign in to comment.