diff --git a/mountpoint-s3/examples/upload_benchmark.rs b/mountpoint-s3/examples/upload_benchmark.rs index bbd04864c..507b85afd 100644 --- a/mountpoint-s3/examples/upload_benchmark.rs +++ b/mountpoint-s3/examples/upload_benchmark.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use std::time::Instant; use clap::Parser; -use futures::task::Spawn; use mountpoint_s3::mem_limiter::MemoryLimiter; -use mountpoint_s3::upload::{AppendUploader, Uploader}; +use mountpoint_s3::upload::Uploader; use mountpoint_s3::ServerSideEncryption; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::types::ChecksumAlgorithm; @@ -107,11 +106,41 @@ fn main() { let runtime = client.event_loop_group(); for i in 0..args.iterations { + let max_memory_target = if let Some(target) = args.max_memory_target { + target * 1024 * 1024 + } else { + // Default to 95% of total system memory + let sys = System::new_with_specifics(RefreshKind::everything()); + (sys.total_memory() as f64 * 0.95) as u64 + }; + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target)); + + let buffer_size = args.write_part_size; + let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); + + let checksum_algorithm = match args.checksum_algorithm.as_str() { + "off" => None, + "crc32c" => Some(ChecksumAlgorithm::Crc32c), + "crc32" => Some(ChecksumAlgorithm::Crc32), + "sha1" => Some(ChecksumAlgorithm::Sha1), + "sha256" => Some(ChecksumAlgorithm::Sha256), + other => Some(ChecksumAlgorithm::Unknown(other.to_string())), + }; + let uploader = Uploader::new( + client.clone(), + runtime.clone(), + mem_limiter, + None, + server_side_encryption, + buffer_size, + checksum_algorithm, + ); + let start = Instant::now(); if args.incremental_upload { - futures::executor::block_on(run_append_uploader(client.clone(), runtime.clone(), &args, i)); + futures::executor::block_on(run_append_uploader(&uploader, &args, i)); } else { - futures::executor::block_on(run_mpu_uploader(client.clone(), &args, i)); + futures::executor::block_on(run_mpu_uploader(&uploader, &args, i)); } let elapsed = start.elapsed(); let uploaded_size_mib = (args.object_size as f64) / (1024 * 1024) as f64; @@ -128,20 +157,15 @@ fn main() { } } -async fn run_mpu_uploader(client: Arc, args: &UploadBenchmarkArgs, iteration: usize) { +async fn run_mpu_uploader(uploader: &Uploader, args: &UploadBenchmarkArgs, iteration: usize) +where + Client: ObjectClient + Clone + Send + Sync + 'static, +{ let start = Instant::now(); - let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); - - let use_additional_checksum = match args.checksum_algorithm.as_str() { - "off" => false, - "crc32c" => true, - other => todo!("MPU uploader does not support {other} checksum algorithm"), - }; - let uploader = Uploader::new(client.clone(), None, server_side_encryption, use_additional_checksum); let bucket = args.bucket.clone(); let key = args.key.clone(); - let mut upload_request = uploader.put(&bucket, &key).await.unwrap(); + let mut upload_request = uploader.start_atomic_upload(&bucket, &key).await.unwrap(); let mut total_bytes_written = 0; let target_size = args.object_size; @@ -166,48 +190,15 @@ async fn run_mpu_uploader(client: Arc, args: &Uplo upload_request.complete().await.unwrap(); } -async fn run_append_uploader( - client: Arc, - runtime: Runtime, - args: &UploadBenchmarkArgs, - iteration: usize, -) where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, +async fn run_append_uploader(uploader: &Uploader, args: &UploadBenchmarkArgs, iteration: usize) +where + Client: ObjectClient + Clone + Send + Sync + 'static, { let start = Instant::now(); - let max_memory_target = if let Some(target) = args.max_memory_target { - target * 1024 * 1024 - } else { - // Default to 95% of total system memory - let sys = System::new_with_specifics(RefreshKind::everything()); - (sys.total_memory() as f64 * 0.95) as u64 - }; - let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target)); - - let buffer_size = args.write_part_size; - let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); - - let checksum_algorithm = match args.checksum_algorithm.as_str() { - "off" => None, - "crc32c" => Some(ChecksumAlgorithm::Crc32c), - "crc32" => Some(ChecksumAlgorithm::Crc32), - "sha1" => Some(ChecksumAlgorithm::Sha1), - "sha256" => Some(ChecksumAlgorithm::Sha256), - other => Some(ChecksumAlgorithm::Unknown(other.to_string())), - }; - let uploader = AppendUploader::new( - client.clone(), - runtime, - mem_limiter, - buffer_size, - server_side_encryption, - checksum_algorithm, - ); let bucket = args.bucket.clone(); let key = args.key.clone(); - let mut upload_request = uploader.start_upload(bucket.clone(), key.clone(), 0, None); + let mut upload_request = uploader.start_incremental_upload(bucket.clone(), key.clone(), 0, None); let mut total_bytes_written = 0; let target_size = args.object_size; diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index d96e2110d..d489fcfa9 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use futures::task::Spawn; + use std::collections::HashMap; use std::ffi::{OsStr, OsString}; use std::time::{Duration, UNIX_EPOCH}; @@ -21,7 +22,7 @@ use crate::prefix::Prefix; use crate::superblock::{InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, SuperblockConfig}; use crate::sync::atomic::{AtomicU64, Ordering}; use crate::sync::{Arc, AsyncMutex, AsyncRwLock}; -use crate::upload::{AppendUploader, Uploader}; +use crate::upload::Uploader; pub use crate::superblock::InodeNo; @@ -61,7 +62,6 @@ where superblock: Superblock, prefetcher: Prefetcher, uploader: Uploader, - append_uploader: AppendUploader, bucket: String, #[allow(unused)] prefix: Prefix, @@ -169,17 +169,12 @@ where let superblock = Superblock::new(bucket, prefix, superblock_config); let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit)); let uploader = Uploader::new( - client.clone(), - config.storage_class.to_owned(), - config.server_side_encryption.clone(), - config.use_upload_checksums, - ); - let append_uploader = AppendUploader::new( client.clone(), runtime, mem_limiter.clone(), - client.write_part_size().unwrap(), + config.storage_class.to_owned(), config.server_side_encryption.clone(), + client.write_part_size().unwrap(), config.use_upload_checksums.then_some(ChecksumAlgorithm::Crc32c), ); @@ -190,7 +185,6 @@ where superblock, prefetcher, uploader, - append_uploader, bucket: bucket.to_string(), prefix: prefix.clone(), next_handle: AtomicU64::new(1), diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index b3b0fdd01..cdbb703a6 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -6,7 +6,7 @@ use tracing::Level; use crate::fs::error_metadata::ErrorMetadata; use crate::prefetch::PrefetchReadError; use crate::superblock::InodeError; -use crate::upload::{AppendUploadError, UploadWriteError}; +use crate::upload::UploadError; /// Generate an error that includes a conversion to a libc errno for use in replies to FUSE. /// @@ -108,8 +108,8 @@ impl From for Error { } } -impl From> for Error { - fn from(err: UploadWriteError) -> Self { +impl From> for Error { + fn from(err: UploadError) -> Self { let errno = err.to_errno(); Error { errno, @@ -117,21 +117,7 @@ impl From> for source: Some(anyhow::anyhow!(err)), // We are having WARN as the default level of logging for fuse errors level: Level::WARN, - metadata: Default::default(), // TODO (vlaad): must be cloned from UploadWriteError - } - } -} - -impl From> for Error { - fn from(err: AppendUploadError) -> Self { - let errno = err.to_errno(); - Error { - errno, - message: String::from("upload error"), - source: Some(anyhow::anyhow!(err)), - // We are having WARN as the default level of logging for fuse errors - level: Level::WARN, - metadata: Default::default(), // TODO (vlaad): must be cloned from AppendUploadError + metadata: Default::default(), // TODO (vlaad): must be cloned from UploadError } } } @@ -193,25 +179,16 @@ impl ToErrno for InodeError { } } -impl ToErrno for UploadWriteError { - fn to_errno(&self) -> libc::c_int { - match self { - UploadWriteError::PutRequestFailed(_) => libc::EIO, - UploadWriteError::OutOfOrderWrite { .. } => libc::EINVAL, - UploadWriteError::ObjectTooBig { .. } => libc::EFBIG, - } - } -} - -impl ToErrno for AppendUploadError { +impl ToErrno for UploadError { fn to_errno(&self) -> libc::c_int { match self { - AppendUploadError::PutRequestFailed(_) => libc::EIO, - AppendUploadError::UploadAlreadyTerminated => libc::EIO, - AppendUploadError::SseCorruptedError(_) => libc::EIO, - AppendUploadError::ChecksumComputationFailed(_) => libc::EIO, - AppendUploadError::HeadObjectFailed(_) => libc::EIO, - AppendUploadError::OutOfOrderWrite { .. } => libc::EINVAL, + UploadError::PutRequestFailed(_) => libc::EIO, + UploadError::UploadAlreadyTerminated => libc::EIO, + UploadError::SseCorruptedError(_) => libc::EIO, + UploadError::ChecksumComputationFailed(_) => libc::EIO, + UploadError::HeadObjectFailed(_) => libc::EIO, + UploadError::OutOfOrderWrite { .. } => libc::EINVAL, + UploadError::ObjectTooBig { .. } => libc::EFBIG, } } } diff --git a/mountpoint-s3/src/fs/handles.rs b/mountpoint-s3/src/fs/handles.rs index aefa7a1eb..41ad90c76 100644 --- a/mountpoint-s3/src/fs/handles.rs +++ b/mountpoint-s3/src/fs/handles.rs @@ -107,7 +107,7 @@ where lookup.stat.etag.as_ref().map(|e| e.into()) }; let current_offset = if is_truncate { 0 } else { lookup.stat.size as u64 }; - let request = fs.append_uploader.start_upload( + let request = fs.uploader.start_incremental_upload( bucket.to_owned(), key.to_owned(), current_offset, @@ -120,7 +120,7 @@ where written_bytes: 0, }) } else { - match fs.uploader.put(bucket, key).await { + match fs.uploader.start_atomic_upload(bucket, key).await { Err(e) => return Err(err!(libc::EIO, source:e, "put failed to start")), Ok(request) => FileHandleState::Write(UploadState::MPUInProgress { request, handle }), } @@ -247,7 +247,7 @@ where Ok(etag) => { // Restart append request. let initial_etag = etag.or(initial_etag); - let request = fs.append_uploader.start_upload( + let request = fs.uploader.start_incremental_upload( fs.bucket.clone(), key.to_owned(), current_offset, diff --git a/mountpoint-s3/src/upload.rs b/mountpoint-s3/src/upload.rs index 213d1f677..3bc92de26 100644 --- a/mountpoint-s3/src/upload.rs +++ b/mountpoint-s3/src/upload.rs @@ -27,92 +27,27 @@ pub use incremental::AppendUploadRequest; /// An [Uploader] creates and manages streaming PutObject requests. #[derive(Debug)] -pub struct Uploader { - client: Client, - storage_class: Option, - server_side_encryption: ServerSideEncryption, - use_additional_checksums: bool, -} - -#[derive(Debug, Error)] -pub enum UploadPutError { - #[error("put request creation failed")] - ClientError(#[from] ObjectClientError), - #[error("SSE settings corrupted")] - SseCorruptedError(#[from] SseCorruptedError), -} - -#[derive(Debug, Error, Clone)] -pub enum UploadWriteError { - #[error("put request failed")] - PutRequestFailed(#[from] E), - - #[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")] - OutOfOrderWrite { write_offset: u64, expected_offset: u64 }, - - #[error("object exceeded maximum upload size of {maximum_size} bytes")] - ObjectTooBig { maximum_size: usize }, -} - -impl Uploader { - /// Create a new [Uploader] that will make requests to the given client. - pub fn new( - client: Client, - storage_class: Option, - server_side_encryption: ServerSideEncryption, - use_additional_checksums: bool, - ) -> Self { - Self { - client, - storage_class, - server_side_encryption, - use_additional_checksums, - } - } - - /// Start a new put request to the specified object. - pub async fn put( - &self, - bucket: &str, - key: &str, - ) -> Result, UploadPutError> { - UploadRequest::new(self, bucket, key).await - } - - #[cfg(test)] - pub fn corrupt_sse(&mut self, sse_type: Option, sse_kms_key_id: Option) { - self.server_side_encryption.corrupt_data(sse_type, sse_kms_key_id) - } -} - -/// Maximum number of bytes an `AppendUploadQueue` can take. -/// -/// We use this limit to prevent a single pipeline from consuming all memory. -/// The limit may slow down writes eventually, but the overall upload throughput -/// is already capped by a single PutObject request. -const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024; - -/// An [AppendUploader] creates and manages streaming PutObject requests using the Append API. -#[derive(Debug)] -pub struct AppendUploader { +pub struct Uploader { client: Client, runtime: BoxRuntime, mem_limiter: Arc>, - buffer_size: usize, + storage_class: Option, server_side_encryption: ServerSideEncryption, - /// Default checksum algorithm, if any, to be used for new S3 objects created by an [AppendUploader]. + buffer_size: usize, + /// Default checksum algorithm, if any, to be used for new S3 objects. /// + /// Only [ChecksumAlgorithm::Crc32c] is supported for multi-part uploads. /// For existing objects, Mountpoint will instead append using the existing checksum algorithm on the object. default_checksum_algorithm: Option, } #[derive(Debug, Error)] -pub enum AppendUploadError { +pub enum UploadError { #[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")] OutOfOrderWrite { write_offset: u64, expected_offset: u64 }, #[error("put request failed")] - PutRequestFailed(#[source] ObjectClientError), + PutRequestFailed(#[from] ObjectClientError), #[error("upload was already terminated because of previous failures")] UploadAlreadyTerminated, @@ -125,32 +60,47 @@ pub enum AppendUploadError { #[error("head object request failed")] HeadObjectFailed(#[from] ObjectClientError), + + #[error("object exceeded maximum upload size of {maximum_size} bytes")] + ObjectTooBig { maximum_size: usize }, } -impl AppendUploader +impl Uploader where Client: ObjectClient + Clone + Send + Sync + 'static, { + /// Create a new [Uploader] that will make requests to the given client. pub fn new( client: Client, runtime: impl Spawn + Sync + Send + 'static, mem_limiter: Arc>, - buffer_size: usize, + storage_class: Option, server_side_encryption: ServerSideEncryption, + buffer_size: usize, default_checksum_algorithm: Option, ) -> Self { Self { client, runtime: runtime.into(), mem_limiter, - buffer_size, + storage_class, server_side_encryption, + buffer_size, default_checksum_algorithm, } } - /// Start a new appendable upload to the specified object. - pub fn start_upload( + /// Start a new atomic upload. + pub async fn start_atomic_upload( + &self, + bucket: &str, + key: &str, + ) -> Result, UploadError> { + UploadRequest::new(self, bucket, key).await + } + + /// Start a new incremental upload. + pub fn start_incremental_upload( &self, bucket: String, key: String, @@ -174,8 +124,20 @@ where params, ) } + + #[cfg(test)] + pub fn corrupt_sse(&mut self, sse_type: Option, sse_kms_key_id: Option) { + self.server_side_encryption.corrupt_data(sse_type, sse_kms_key_id) + } } +/// Maximum number of bytes an `AppendUploadQueue` can take. +/// +/// We use this limit to prevent a single pipeline from consuming all memory. +/// The limit may slow down writes eventually, but the overall upload throughput +/// is already capped by a single PutObject request. +const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024; + struct BoxRuntime(Box); impl BoxRuntime { fn spawn_with_handle(&self, future: Fut) -> Result, SpawnError> diff --git a/mountpoint-s3/src/upload/atomic.rs b/mountpoint-s3/src/upload/atomic.rs index 83f22ca32..cbb7cce68 100644 --- a/mountpoint-s3/src/upload/atomic.rs +++ b/mountpoint-s3/src/upload/atomic.rs @@ -2,8 +2,7 @@ use std::fmt::Debug; use mountpoint_s3_client::{ checksums::{crc32c_from_base64, Crc32c}, - error::{ObjectClientError, PutObjectError}, - types::{PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview}, + types::{ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview}, ObjectClient, PutObjectRequest, }; use mountpoint_s3_crt::checksums::crc32c; @@ -11,9 +10,7 @@ use tracing::error; use crate::{checksums::combine_checksums, ServerSideEncryption}; -use super::{UploadPutError, UploadWriteError, Uploader}; - -type PutRequestError = ObjectClientError::ClientError>; +use super::{UploadError, Uploader}; const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000; @@ -35,13 +32,19 @@ impl UploadRequest { uploader: &Uploader, bucket: &str, key: &str, - ) -> Result> { + ) -> Result> { let mut params = PutObjectParams::new(); - if uploader.use_additional_checksums { - params = params.trailing_checksums(PutObjectTrailingChecksums::Enabled); - } else { - params = params.trailing_checksums(PutObjectTrailingChecksums::ReviewOnly); + match &uploader.default_checksum_algorithm { + Some(ChecksumAlgorithm::Crc32c) => { + params = params.trailing_checksums(PutObjectTrailingChecksums::Enabled); + } + Some(unsupported) => { + unimplemented!("checksum algorithm not supported: {:?}", unsupported); + } + None => { + params = params.trailing_checksums(PutObjectTrailingChecksums::ReviewOnly); + } } if let Some(storage_class) = &uploader.storage_class { @@ -76,21 +79,17 @@ impl UploadRequest { self.next_request_offset } - pub async fn write( - &mut self, - offset: i64, - data: &[u8], - ) -> Result>> { + pub async fn write(&mut self, offset: i64, data: &[u8]) -> Result> { let next_offset = self.next_request_offset; if offset != next_offset as i64 { - return Err(UploadWriteError::OutOfOrderWrite { + return Err(UploadError::OutOfOrderWrite { write_offset: offset as u64, expected_offset: next_offset, }); } if let Some(maximum_size) = self.maximum_upload_size { if next_offset + data.len() as u64 > maximum_size as u64 { - return Err(UploadWriteError::ObjectTooBig { maximum_size }); + return Err(UploadError::ObjectTooBig { maximum_size }); } } @@ -100,7 +99,7 @@ impl UploadRequest { Ok(data.len()) } - pub async fn complete(self) -> Result> { + pub async fn complete(self) -> Result> { let size = self.size(); let checksum = self.hasher.finalize(); let result = self @@ -180,14 +179,40 @@ mod tests { use std::collections::HashMap; use crate::fs::SseCorruptedError; + use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT}; use crate::sync::Arc; + use futures::executor::ThreadPool; use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig}; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError}; + use mountpoint_s3_client::types::ChecksumAlgorithm; use test_case::test_case; use super::*; + fn new_uploader_for_test( + client: Client, + storage_class: Option, + server_side_encryption: ServerSideEncryption, + use_additional_checksums: bool, + ) -> Uploader + where + Client: ObjectClient + Clone + Send + Sync + 'static, + { + let buffer_size = client.write_part_size().unwrap(); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); + Uploader::new( + client, + runtime, + mem_limiter.into(), + storage_class, + server_side_encryption, + buffer_size, + use_additional_checksums.then_some(ChecksumAlgorithm::Crc32c), + ) + } + #[tokio::test] async fn complete_test() { let bucket = "bucket"; @@ -199,8 +224,8 @@ mod tests { part_size: 32, ..Default::default() })); - let uploader = Uploader::new(client.clone(), None, ServerSideEncryption::default(), true); - let request = uploader.put(bucket, key).await.unwrap(); + let uploader = new_uploader_for_test(client.clone(), None, ServerSideEncryption::default(), true); + let request = uploader.start_atomic_upload(bucket, key).await.unwrap(); assert!(!client.contains_key(key)); assert!(client.is_upload_in_progress(key)); @@ -223,14 +248,14 @@ mod tests { part_size: 32, ..Default::default() })); - let uploader = Uploader::new( + let uploader = new_uploader_for_test( client.clone(), Some(storage_class.to_owned()), ServerSideEncryption::default(), true, ); - let mut request = uploader.put(bucket, key).await.unwrap(); + let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap(); let data = b"foo"; let mut offset = 0; @@ -277,11 +302,11 @@ mod tests { }, )); - let uploader = Uploader::new(failure_client.clone(), None, ServerSideEncryption::default(), true); + let uploader = new_uploader_for_test(failure_client.clone(), None, ServerSideEncryption::default(), true); // First request fails on first write. { - let mut request = uploader.put(bucket, key).await.unwrap(); + let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap(); let data = b"foo"; request.write(0, data).await.expect_err("first write should fail"); @@ -291,7 +316,7 @@ mod tests { // Second request fails on complete (after one write). { - let mut request = uploader.put(bucket, key).await.unwrap(); + let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap(); let data = b"foo"; _ = request.write(0, data).await.unwrap(); @@ -318,8 +343,8 @@ mod tests { part_size: PART_SIZE, ..Default::default() })); - let uploader = Uploader::new(client.clone(), None, ServerSideEncryption::default(), true); - let mut request = uploader.put(bucket, key).await.unwrap(); + let uploader = new_uploader_for_test(client.clone(), None, ServerSideEncryption::default(), true); + let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap(); let successful_writes = PART_SIZE * MAX_S3_MULTIPART_UPLOAD_PARTS / write_size; let data = vec![0xaa; write_size]; @@ -347,7 +372,7 @@ mod tests { #[tokio::test] async fn put_with_corrupted_sse_test(sse_type_corrupted: Option<&str>, key_id_corrupted: Option<&str>) { let client = Arc::new(MockClient::new(Default::default())); - let mut uploader = Uploader::new( + let mut uploader = new_uploader_for_test( client, None, ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key_alias".to_string())), @@ -357,12 +382,12 @@ mod tests { .server_side_encryption .corrupt_data(sse_type_corrupted.map(String::from), key_id_corrupted.map(String::from)); let err = uploader - .put("bucket", "hello") + .start_atomic_upload("bucket", "hello") .await .expect_err("sse checksum must be checked"); assert!(matches!( err, - UploadPutError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _)) + UploadError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _)) )); } @@ -377,12 +402,15 @@ mod tests { part_size: 32, ..Default::default() })); - let uploader = Uploader::new( + let uploader = new_uploader_for_test( client, None, ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key".to_string())), true, ); - uploader.put(bucket, key).await.expect("put with sse should succeed"); + uploader + .start_atomic_upload(bucket, key) + .await + .expect("put with sse should succeed"); } } diff --git a/mountpoint-s3/src/upload/incremental.rs b/mountpoint-s3/src/upload/incremental.rs index a6f1f2faa..f94f76724 100644 --- a/mountpoint-s3/src/upload/incremental.rs +++ b/mountpoint-s3/src/upload/incremental.rs @@ -18,7 +18,7 @@ use crate::sync::Arc; use crate::ServerSideEncryption; use super::hasher::ChecksumHasher; -use super::{AppendUploadError, BoxRuntime, ChecksumHasherError}; +use super::{BoxRuntime, ChecksumHasherError, UploadError}; /// Handle for appending data to an S3 object. /// @@ -61,12 +61,12 @@ where /// but will be queued to upload until the buffer is full and all previous buffers have /// been uploaded. /// On success, returns the number of bytes written. - pub async fn write(&mut self, offset: u64, data: &[u8]) -> Result> { + pub async fn write(&mut self, offset: u64, data: &[u8]) -> Result> { // Bail out if a previous request failed self.upload_queue.verify().await?; if offset != self.offset { - return Err(AppendUploadError::OutOfOrderWrite { + return Err(UploadError::OutOfOrderWrite { write_offset: offset, expected_offset: self.offset, }); @@ -97,7 +97,7 @@ where /// Complete the upload and return the last `PutObjectResult` if any PUT requests are submitted. /// The pipeline cannot be used after this. - pub async fn complete(mut self) -> Result, AppendUploadError> { + pub async fn complete(mut self) -> Result, UploadError> { if let Some(buffer) = self.buffer.take() { trace!("push remaining buffer to append queue"); self.upload_queue.push(buffer).await?; @@ -138,7 +138,7 @@ struct AppendUploadQueue { /// Channel handle for sending buffers to be appended to the object. request_sender: Sender>, /// Channel handle for receiving the result of S3 requests via [Output] messages. - output_receiver: Receiver>>, + output_receiver: Receiver>>, mem_limiter: Arc>, _task_handle: RemoteHandle<()>, /// Algorithm used to compute checksums. Initialized asynchronously in [get_buffer]. @@ -190,9 +190,9 @@ where /// Returns `true` if output was sent successfully. /// When the output cannot be sent, buffer receiver will be shut down. async fn send_output( - sender: &Sender>>, + sender: &Sender>>, receiver: &Receiver>, - output: Result>, + output: Result>, ) -> bool { let error = output.is_err(); if error { @@ -233,7 +233,7 @@ where trace!(?head_object, "received head_object response"); if Some(head_object.etag) != etag { // Fail early if the etag has changed. - Err(AppendUploadError::PutRequestFailed(ObjectClientError::ServiceError( + Err(UploadError::PutRequestFailed(ObjectClientError::ServiceError( PutObjectError::PreconditionFailed, ))) } else { @@ -281,33 +281,33 @@ where } // Push given bytes with its checksum to the upload queue - pub async fn push(&mut self, buffer: UploadBuffer) -> Result<(), AppendUploadError> { + pub async fn push(&mut self, buffer: UploadBuffer) -> Result<(), UploadError> { if let Err(_send_error) = self.request_sender.send(buffer).await { // The upload queue could be closed if there was a client error from previous requests trace!("upload queue is already closed"); while self.consume_next_output().await? {} - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } self.requests_in_queue += 1; Ok(()) } - pub async fn verify(&mut self) -> Result<(), AppendUploadError> { + pub async fn verify(&mut self) -> Result<(), UploadError> { if self.request_sender.is_closed() { // The upload queue could be closed if there was a client error from previous requests trace!("upload queue is already closed"); while self.consume_next_output().await? {} - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } Ok(()) } // Close the upload queue, wait for all uploads in the queue to complete, and get the last `PutObjectResult` - pub async fn join(mut self) -> Result, AppendUploadError> { + pub async fn join(mut self) -> Result, UploadError> { let terminated = !self.request_sender.close(); while self.consume_next_output().await? {} if terminated { - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } Ok(self.last_known_result.take()) } @@ -315,14 +315,14 @@ where pub async fn get_buffer( &mut self, capacity: usize, - ) -> Result, AppendUploadError> { + ) -> Result, UploadError> { let Some(checksum_algorithm) = self.checksum_algorithm.clone() else { trace!("wait for initial output"); match self .output_receiver .recv() .await - .unwrap_or(Err(AppendUploadError::UploadAlreadyTerminated))? + .unwrap_or(Err(UploadError::UploadAlreadyTerminated))? { Output::ChecksumAlgorithm(algorithm) => { trace!(?algorithm, "selected checksum algorithm"); @@ -340,7 +340,7 @@ where // wait for requests in the queue to complete before trying to reserve memory again trace!("wait for the next request to be processed"); if !self.consume_next_output().await? { - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } } } @@ -356,7 +356,7 @@ where /// Wait on output, updating the state of the [AppendUploadQueue] when next output arrives. /// /// Returns `true` when next output is successfully consumed, or `false` when no more output is available. - async fn consume_next_output(&mut self) -> Result> { + async fn consume_next_output(&mut self) -> Result> { let Ok(output) = self.output_receiver.recv().await else { return Ok(false); }; @@ -453,7 +453,7 @@ async fn append( offset: u64, etag: Option, server_side_encryption: ServerSideEncryption, -) -> Result> { +) -> Result> { trace!(key, offset, len = buffer.len(), "preparing PutObject request"); let (data, checksum) = buffer.freeze()?; let mut request_params = if offset == 0 { @@ -463,14 +463,14 @@ async fn append( }; let (sse_type, key_id) = server_side_encryption .into_inner() - .map_err(AppendUploadError::SseCorruptedError)?; + .map_err(UploadError::SseCorruptedError)?; request_params.checksum = checksum; request_params.server_side_encryption = sse_type; request_params.ssekms_key_id = key_id; client .put_object_single(bucket, key, &request_params, data) .await - .map_err(AppendUploadError::PutRequestFailed) + .map_err(UploadError::PutRequestFailed) } #[cfg(test)] @@ -479,7 +479,7 @@ mod tests { use crate::mem_limiter::MINIMUM_MEM_LIMIT; - use super::super::AppendUploader; + use super::super::Uploader; use super::*; use futures::executor::ThreadPool; @@ -493,20 +493,21 @@ mod tests { client: Client, buffer_size: usize, server_side_encryption: Option, - checksum_algorithm: Option, - ) -> AppendUploader + default_checksum_algorithm: Option, + ) -> Uploader where Client: ObjectClient + Clone + Send + Sync + 'static, { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); - AppendUploader::new( + Uploader::new( client, runtime, mem_limiter.into(), - buffer_size, + None, server_side_encryption.unwrap_or_default(), - checksum_algorithm, + buffer_size, + default_checksum_algorithm, ) } @@ -542,7 +543,8 @@ mod tests { let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None); let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64); let initial_etag = existing_object.map(|object| object.etag()); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); // Write some data let append_data = [0xaa; 128]; @@ -614,7 +616,8 @@ mod tests { let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None); let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64); let initial_etag = existing_object.map(|object| object.etag()); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); // Write some data and verify that buffer length should not grow larger than configured capacity let append_data = [0xaa; 384]; @@ -693,7 +696,8 @@ mod tests { let buffer_size = 256; let uploader = new_uploader_for_test(client.clone(), buffer_size, None, default_checksum_algorithm.clone()); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag); // Write some data let append_data = [0xaa; 384]; @@ -753,7 +757,8 @@ mod tests { let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None); let initial_offset = existing_object.as_ref().map_or(0, |object| object.len() as u64); let initial_etag = existing_object.map(|object| object.etag()); - let upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, initial_etag); + let upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, initial_etag); // Wait for the upload to complete upload_request .complete() @@ -790,7 +795,7 @@ mod tests { let initial_offset = (existing_object.len() - 1) as u64; let initial_etag = existing_object.etag(); let mut upload_request = - uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); let append_data = [0xaa; 128]; upload_request @@ -800,7 +805,7 @@ mod tests { // Verify that the request fails at completion assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::PutRequestFailed(_)) + Err(UploadError::PutRequestFailed(_)) )); } @@ -836,7 +841,7 @@ mod tests { let initial_offset = existing_object.len() as u64; let initial_etag = existing_object.etag(); let mut upload_request = - uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); // Write data more than the buffer capacity as the first append should succeed let append_data = [0xab; 384]; @@ -848,7 +853,7 @@ mod tests { // Verify that the request fails and the error is surfaced let result = upload_request.complete().await; - assert!(matches!(result, Err(AppendUploadError::PutRequestFailed(_)))); + assert!(matches!(result, Err(UploadError::PutRequestFailed(_)))); // Verify that object is partially appended from the first request let get_request = client @@ -879,7 +884,8 @@ mod tests { // Test append with a wrong offset let mut offset = (existing_object.len() - 1) as u64; let initial_etag = existing_object.etag(); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); // Keep writing and it should fail eventually let mut write_success_count = 0; @@ -892,7 +898,7 @@ mod tests { write_success_count += 1; } Err(e) => { - assert!(matches!(e, AppendUploadError::PutRequestFailed(_))); + assert!(matches!(e, UploadError::PutRequestFailed(_))); break; } } @@ -905,11 +911,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); } @@ -944,7 +950,8 @@ mod tests { let uploader = new_uploader_for_test(failure_client, buffer_size, None, None); let mut offset = existing_object.len() as u64; let initial_etag = existing_object.etag(); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); // Keep writing and it should fail eventually let mut write_success_count = 0; @@ -960,7 +967,7 @@ mod tests { write_success_count += 1; } Err(e) => { - assert!(matches!(e, AppendUploadError::PutRequestFailed(_))); + assert!(matches!(e, UploadError::PutRequestFailed(_))); break; } } @@ -973,11 +980,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); // Verify that object is partially appended from the first request @@ -1009,7 +1016,8 @@ mod tests { // Start appending let mut offset = existing_object.len() as u64; let initial_etag = existing_object.etag(); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); + let mut upload_request = + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag)); // Replace the existing object let replacing_object = MockObject::from(vec![0xcc; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]); @@ -1028,7 +1036,7 @@ mod tests { Err(e) => { assert!(matches!( e, - AppendUploadError::PutRequestFailed(ObjectClientError::ServiceError( + UploadError::PutRequestFailed(ObjectClientError::ServiceError( PutObjectError::PreconditionFailed )) )); @@ -1044,11 +1052,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); } @@ -1065,7 +1073,7 @@ mod tests { let buffer_size = 256; let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None); - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), 0, None); + let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), 0, None); // Write some data let append_data = [0xaa; 128]; @@ -1081,7 +1089,7 @@ mod tests { .await .expect_err("out-of-order write should fail"); - assert!(matches!(error, AppendUploadError::OutOfOrderWrite { .. })); + assert!(matches!(error, UploadError::OutOfOrderWrite { .. })); } #[test_case(Some("aws:kmr"), Some("some_key_alias"))] @@ -1115,7 +1123,7 @@ mod tests { let initial_offset = existing_object.len() as u64; let initial_etag = existing_object.etag(); let mut upload_request = - uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); let append_data = [0xaa; 128]; upload_request @@ -1126,7 +1134,7 @@ mod tests { // Verify that the request fails at completion assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::SseCorruptedError(_)) + Err(UploadError::SseCorruptedError(_)) )); } @@ -1154,7 +1162,7 @@ mod tests { let initial_offset = existing_object.len() as u64; let initial_etag = existing_object.etag(); let mut upload_request = - uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); + uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag)); let append_data = [0xaa; 128]; expected_content.extend_from_slice(&append_data); @@ -1191,17 +1199,18 @@ mod tests { // Use a memory limiter with 0 limit let mem_limiter = MemoryLimiter::new(client.clone(), 0); - let uploader = AppendUploader::new( + let uploader = Uploader::new( client.clone(), ThreadPool::builder().pool_size(1).create().unwrap(), mem_limiter.into(), - part_size, + None, Default::default(), + part_size, None, ); let mut offset = 0; - let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, None); + let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, None); let mut expected_content = Vec::new(); // Write enough data to fill multiple parts