diff --git a/.cargo/config.toml b/.cargo/config.toml index 040e71a..62a0d96 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,7 @@ [build] # Share one `target` directory at the project root for all Cargo projects and workspaces in aws-s3-transfer-manager-rs target-dir = "target" + +[profile.profiling] +inherits = "release" +debug = true diff --git a/README.md b/README.md index 56405a8..d6db43f 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,19 @@ cargo run --example cp -- -h ```sh AWS_PROFILE= RUST_LOG=trace cargo run --example cp s3:/// /local/path/ ``` +NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`. +NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace` + +**Upload a file to S3** + +```sh +AWS_PROFILE= RUST_LOG=trace cargo run --example cp /local/path/ s3:/// +``` NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`. NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace` + #### Flamegraphs See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information. @@ -49,9 +58,18 @@ sudo AWS_PROFILE= RUST_LOG=aws_s3_transfer_manager=info cargo flam #### Using tokio-console -Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with +By default examples use `tracing` crate for logs. You can pass the `--tokio-console` flag to examples to +use [`console-subscriber`](https://crates.io/crates/console-subscriber) instead. This allows you to run them with [tokio-console](https://github.com/tokio-rs/console) to help debug task execution. +NOTE: This requires you build the examples with `RUSTFLAGS="--cfg tokio_unstable"` or setting the equivalent in +your cargo config. + + +```sh +RUSTFLAGS="--cfg tokio_unstable" AWS_PROFILE= RUST_LOG=debug cargo run --example cp --tokio-console ... +``` + Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the example with `tokio-console` running. diff --git a/aws-s3-transfer-manager/.cargo/config.toml b/aws-s3-transfer-manager/.cargo/config.toml deleted file mode 100644 index de03f0d..0000000 --- a/aws-s3-transfer-manager/.cargo/config.toml +++ /dev/null @@ -1,6 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] - -[profile.profiling] -inherits = "release" -debug = true diff --git a/aws-s3-transfer-manager/Cargo.toml b/aws-s3-transfer-manager/Cargo.toml index d6570a1..7bdcf51 100644 --- a/aws-s3-transfer-manager/Cargo.toml +++ b/aws-s3-transfer-manager/Cargo.toml @@ -30,3 +30,6 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std console-subscriber = "0.3.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tempfile = "3.10.1" + +[target.'cfg(not(target_env = "msvc"))'.dev-dependencies] +jemallocator = "0.5.4" diff --git a/aws-s3-transfer-manager/examples/cp.rs b/aws-s3-transfer-manager/examples/cp.rs index a83cd16..e0861c5 100644 --- a/aws-s3-transfer-manager/examples/cp.rs +++ b/aws-s3-transfer-manager/examples/cp.rs @@ -10,6 +10,9 @@ use std::{mem, time}; use aws_s3_transfer_manager::download::Downloader; use aws_s3_transfer_manager::download::body::Body; +use aws_s3_transfer_manager::io::InputStream; +use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize}; +use aws_s3_transfer_manager::upload::{UploadRequest, Uploader}; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; use aws_types::SdkConfig; use bytes::Buf; @@ -20,7 +23,14 @@ use tracing::{debug_span, Instrument}; type BoxError = Box; -const ONE_MEBIBYTE: u64 = 1024 * 1024; +const ONE_MEGABYTE: u64 = 1000 * 1000; + +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; #[derive(Debug, Clone, clap::Parser)] #[command(name = "cp")] @@ -41,6 +51,10 @@ pub struct Args { /// Part size to use #[arg(long, default_value_t = 8388608)] part_size: u64, + + /// Enable tokio console (requires RUSTFLAGS="--cfg tokio_unstable") + #[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)] + tokio_console: bool, } #[derive(Clone, Debug)] @@ -102,29 +116,14 @@ fn invalid_arg(message: &str) -> ! { .exit() } -#[tokio::main] -async fn main() -> Result<(), Box> { - console_subscriber::init(); - let args = dbg!(Args::parse()); - - use TransferUri::*; - match (&args.source, &args.dest) { - (Local(_), S3(_)) => todo!("upload not implemented yet"), - (Local(_), Local(_)) => invalid_arg("local to local transfer not supported"), - (S3(_), Local(_)) => (), - (S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"), - } - +async fn do_download(args: Args) -> Result<(), BoxError> { let config = aws_config::from_env().load().await; - - println!("warming up client..."); warmup(&config).await?; - println!("warming up complete"); let tm = Downloader::builder() .sdk_config(config) - .concurrency(args.concurrency) - .target_part_size(args.part_size) + .concurrency(ConcurrencySetting::Explicit(args.concurrency)) + .part_size(PartSize::Target(args.part_size)) .build(); let (bucket, key) = args.source.expect_s3().parts(); @@ -146,51 +145,83 @@ async fn main() -> Result<(), Box> { .await?; let elapsed = start.elapsed(); - let obj_size = handle.object_meta.total_size(); - let obj_size_mebibytes = obj_size as f64 / ONE_MEBIBYTE as f64; + let obj_size_bytes = handle.object_meta.total_size(); + let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64; + let obj_size_megabits = obj_size_megabytes * 8f64; + + println!( + "downloaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}", + obj_size_megabits / elapsed.as_secs_f64(), + ); + + Ok(()) +} + +async fn do_upload(args: Args) -> Result<(), BoxError> { + let config = aws_config::from_env().load().await; + warmup(&config).await?; + + let tm = Uploader::builder() + .sdk_config(config) + .concurrency(ConcurrencySetting::Explicit(args.concurrency)) + .part_size(PartSize::Target(args.part_size)) + .build(); + + let path = args.source.expect_local(); + let file_meta = fs::metadata(path).await.expect("file metadata"); + + let stream = InputStream::from_path(path)?; + let (bucket, key) = args.dest.expect_s3().parts(); + + let request = UploadRequest::builder() + .bucket(bucket) + .key(key) + .body(stream) + .build()?; + + println!("starting upload"); + let start = time::Instant::now(); + + let handle = tm.upload(request).await?; + let _resp = handle.join().await?; + let elapsed = start.elapsed(); + + let obj_size_bytes = file_meta.len(); + let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64; + let obj_size_megabits = obj_size_megabytes * 8f64; println!( - "downloaded {obj_size} bytes ({obj_size_mebibytes} MiB) in {elapsed:?}; MiB/s: {}", - obj_size_mebibytes / elapsed.as_secs_f64(), + "uploaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}", + obj_size_megabits / elapsed.as_secs_f64() ); Ok(()) } -// async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box> { -// let b1: &[u8] = &mut []; -// let b2: &[u8] = &mut []; -// let b3: &[u8] = &mut []; -// let b4: &[u8] = &mut []; -// let b5: &[u8] = &mut []; -// let b6: &[u8] = &mut []; -// let b7: &[u8] = &mut []; -// let b8: &[u8] = &mut []; -// while let Some(chunk) = body.next().await { -// let mut chunk = chunk.unwrap(); -// while chunk.has_remaining() { -// let mut dst = [ -// IoSlice::new(b1), -// IoSlice::new(b2), -// IoSlice::new(b3), -// IoSlice::new(b4), -// IoSlice::new(b5), -// IoSlice::new(b6), -// IoSlice::new(b7), -// IoSlice::new(b8), -// ]; -// let filled = chunk.chunks_vectored(&mut dst[..]); -// tracing::trace!("filled: {filled} io slices"); -// -// let wc = dest.write_vectored(&dst[0..filled]).await?; -// tracing::trace!("wrote: {wc} bytes"); -// chunk.advance(wc); -// } -// } -// Ok(()) -// } - -async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), BoxError> { + let args = dbg!(Args::parse()); + if args.tokio_console { + console_subscriber::init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_thread_ids(true) + .init(); + } + + use TransferUri::*; + match (&args.source, &args.dest) { + (Local(_), S3(_)) => do_upload(args).await?, + (Local(_), Local(_)) => invalid_arg("local to local transfer not supported"), + (S3(_), Local(_)) => do_download(args).await?, + (S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"), + } + + Ok(()) +} + +async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> { while let Some(chunk) = body.next().await { let chunk = chunk.unwrap(); tracing::trace!("recv'd chunk remaining={}", chunk.remaining()); @@ -205,8 +236,10 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box Result<(), Box> { - let s3 = aws_sdk_s3::Client::new(&config); +async fn warmup(config: &SdkConfig) -> Result<(), BoxError> { + println!("warming up client..."); + let s3 = aws_sdk_s3::Client::new(config); s3.list_buckets().send().await?; + println!("warming up complete"); Ok(()) } diff --git a/aws-s3-transfer-manager/src/download.rs b/aws-s3-transfer-manager/src/download.rs index 85faed5..feef502 100644 --- a/aws-s3-transfer-manager/src/download.rs +++ b/aws-s3-transfer-manager/src/download.rs @@ -17,7 +17,8 @@ use crate::download::discovery::{discover_obj, ObjectDiscovery}; use crate::download::handle::DownloadHandle; use crate::download::worker::{distribute_work, download_chunks, ChunkResponse}; use crate::error::TransferError; -use crate::MEBIBYTE; +use crate::types::{ConcurrencySetting, PartSize}; +use crate::{DEFAULT_CONCURRENCY, MEBIBYTE}; use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder}; use aws_types::SdkConfig; use context::DownloadContext; @@ -50,29 +51,23 @@ impl From for DownloadRequest { } /// Fluent style builder for [Downloader] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Builder { - target_part_size_bytes: u64, - // TODO(design): should we instead consider an enum here allows for not only explicit but also - // an "Auto" mode that allows us to control the concurrency actually used based on overall transfer and part size? - concurrency: usize, + part_size: PartSize, + concurrency: ConcurrencySetting, sdk_config: Option, } impl Builder { - fn new() -> Self { - Self { - target_part_size_bytes: 8 * MEBIBYTE, - concurrency: 8, - sdk_config: None, - } + fn new() -> Builder { + Builder::default() } /// Size of parts the object will be downloaded in, in bytes. /// - /// Defaults is 8 MiB. - pub fn target_part_size(mut self, size_bytes: u64) -> Self { - self.target_part_size_bytes = size_bytes; + /// Defaults is [PartSize::Auto]. + pub fn part_size(mut self, target_size: PartSize) -> Self { + self.part_size = target_size; self } @@ -85,8 +80,8 @@ impl Builder { /// Set the concurrency level this component is allowed to use. /// /// This sets the maximum number of concurrent in-flight requests. - /// Default is 8. - pub fn concurrency(mut self, concurrency: usize) -> Self { + /// Default is [ConcurrencySetting::Auto]. + pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self { self.concurrency = concurrency; self } @@ -104,7 +99,7 @@ impl From for Downloader { .unwrap_or_else(|| SdkConfig::builder().build()); let client = aws_sdk_s3::Client::new(&sdk_config); Self { - target_part_size_bytes: value.target_part_size_bytes, + part_size: value.part_size, concurrency: value.concurrency, client, } @@ -115,8 +110,8 @@ impl From for Downloader { /// concurrent requests (e.g. using ranged GET or part number). #[derive(Debug, Clone)] pub struct Downloader { - target_part_size_bytes: u64, - concurrency: usize, + part_size: PartSize, + concurrency: ConcurrencySetting, client: aws_sdk_s3::client::Client, } @@ -155,14 +150,17 @@ impl Downloader { todo!("single part download not implemented") } + let target_part_size_bytes = self.part_size(); let ctx = DownloadContext { client: self.client.clone(), - target_part_size: self.target_part_size_bytes, + target_part_size_bytes, }; + let concurrency = self.concurrency(); + // make initial discovery about the object size, metadata, possibly first chunk let mut discovery = discover_obj(&ctx, &req).await?; - let (comp_tx, comp_rx) = mpsc::channel(self.concurrency); + let (comp_tx, comp_rx) = mpsc::channel(concurrency); let start_seq = handle_discovery_chunk(&mut discovery, &comp_tx).await; // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. @@ -170,9 +168,8 @@ impl Downloader { if !discovery.remaining.is_empty() { // start assigning work - let (work_tx, work_rx) = async_channel::bounded(self.concurrency); + let (work_tx, work_rx) = async_channel::bounded(concurrency); let input = req.input.clone(); - let part_size = self.target_part_size_bytes; let rem = discovery.remaining.clone(); // TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once, @@ -180,9 +177,15 @@ impl Downloader { // quite a few futures created. If more performant could be enabled for // objects less than some size. - tasks.spawn(distribute_work(rem, input, part_size, start_seq, work_tx)); + tasks.spawn(distribute_work( + rem, + input, + target_part_size_bytes, + start_seq, + work_tx, + )); - for i in 0..self.concurrency { + for i in 0..concurrency { let worker = download_chunks(ctx.clone(), work_rx.clone(), comp_tx.clone()) .instrument(tracing::debug_span!("chunk-downloader", worker = i)); tasks.spawn(worker); @@ -202,6 +205,22 @@ impl Downloader { Ok(handle) } + + /// Get the concrete concurrency setting + fn concurrency(&self) -> usize { + match self.concurrency { + ConcurrencySetting::Auto => DEFAULT_CONCURRENCY, + ConcurrencySetting::Explicit(explicit) => explicit, + } + } + + // Get the concrete part size to use in bytes + fn part_size(&self) -> u64 { + match self.part_size { + PartSize::Auto => 8 * MEBIBYTE, + PartSize::Target(explicit) => explicit, + } + } } /// Handle possibly sending the first chunk of data received through discovery. Returns diff --git a/aws-s3-transfer-manager/src/download/context.rs b/aws-s3-transfer-manager/src/download/context.rs index d8f3167..15a6ce8 100644 --- a/aws-s3-transfer-manager/src/download/context.rs +++ b/aws-s3-transfer-manager/src/download/context.rs @@ -7,5 +7,5 @@ #[derive(Debug, Clone)] pub(crate) struct DownloadContext { pub(crate) client: aws_sdk_s3::Client, - pub(crate) target_part_size: u64, + pub(crate) target_part_size_bytes: u64, } diff --git a/aws-s3-transfer-manager/src/download/discovery.rs b/aws-s3-transfer-manager/src/download/discovery.rs index 1f54a7a..8b0ac83 100644 --- a/aws-s3-transfer-manager/src/download/discovery.rs +++ b/aws-s3-transfer-manager/src/download/discovery.rs @@ -82,9 +82,9 @@ pub(super) async fn discover_obj( let byte_range = match range.as_ref() { Some(r) => ByteRange::Inclusive( *r.start(), - cmp::min(*r.start() + ctx.target_part_size - 1, *r.end()), + cmp::min(*r.start() + ctx.target_part_size_bytes - 1, *r.end()), ), - None => ByteRange::Inclusive(0, ctx.target_part_size - 1), + None => ByteRange::Inclusive(0, ctx.target_part_size_bytes - 1), }; let r = request .input @@ -216,7 +216,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size: 5 * MEBIBYTE, + target_part_size_bytes: 5 * MEBIBYTE, }; let request = GetObjectInput::builder() .bucket("test-bucket") @@ -266,7 +266,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size, + target_part_size_bytes: target_part_size, }; let request = GetObjectInput::builder() @@ -300,7 +300,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size, + target_part_size_bytes: target_part_size, }; let request = GetObjectInput::builder() diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index 0b3b757..a824728 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -18,6 +18,10 @@ pub enum TransferError { /// A download failed #[error("download failed")] DownloadFailed(#[from] DownloadError), + + /// A upload failed + #[error("upload failed")] + UploadFailed(#[from] UploadError), } pub(crate) type GetObjectSdkError = ::aws_smithy_runtime_api::client::result::SdkError< @@ -44,6 +48,51 @@ pub enum DownloadError { }, } +pub(crate) type CreateMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError< + aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError, + ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, +>; + +pub(crate) type UploadPartSdkError = ::aws_smithy_runtime_api::client::result::SdkError< + aws_sdk_s3::operation::upload_part::UploadPartError, + ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, +>; + +pub(crate) type CompleteMultipartUploadSdkError = + ::aws_smithy_runtime_api::client::result::SdkError< + aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError, + ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, + >; + +pub(crate) type AbortMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError< + aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError, + ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, +>; + +/// An error related to upload an object +#[derive(thiserror::Error, Debug)] +pub enum UploadError { + /// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload] + #[error(transparent)] + CreateMultipartUpload(#[from] CreateMultipartUploadSdkError), + + /// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload] + #[error(transparent)] + CompleteMultipartUpload(#[from] CompleteMultipartUploadSdkError), + + /// An error occurred invoking [aws_sdk_s3::Client::UploadPart] + #[error(transparent)] + UploadPart(#[from] UploadPartSdkError), + + /// An error occurred invoking [aws_sdk_s3::Client::AbortMultipartUpload] + #[error(transparent)] + AbortMultipartUpload(#[from] AbortMultipartUploadSdkError), + + /// An I/O error occurred + #[error(transparent)] + IOError(#[from] crate::io::error::Error), +} + /// An underlying S3 SDK error #[derive(thiserror::Error, Debug)] pub enum SdkOperationError { @@ -72,3 +121,9 @@ pub(crate) fn chunk_failed>(e: E) -> TransferError { pub(crate) fn invalid_meta_request(message: String) -> TransferError { TransferError::InvalidMetaRequest(message) } + +impl From for TransferError { + fn from(value: CreateMultipartUploadSdkError) -> Self { + TransferError::UploadFailed(value.into()) + } +} diff --git a/aws-s3-transfer-manager/src/io/path_body.rs b/aws-s3-transfer-manager/src/io/path_body.rs index d17ee98..f641216 100644 --- a/aws-s3-transfer-manager/src/io/path_body.rs +++ b/aws-s3-transfer-manager/src/io/path_body.rs @@ -179,7 +179,7 @@ mod test { #[test] fn test_explicit_content_length_and_offset() { - let mut tmp = NamedTempFile::new().unwrap(); + let tmp = NamedTempFile::new().unwrap(); let stream = PathBodyBuilder::new() .path(tmp.path()) @@ -200,7 +200,7 @@ mod test { let content = "hello path body"; tmp.write_all(content.as_bytes()).unwrap(); - let stream = PathBodyBuilder::new() + let _stream = PathBodyBuilder::new() .path(tmp.path()) .offset(22) .build() diff --git a/aws-s3-transfer-manager/src/lib.rs b/aws-s3-transfer-manager/src/lib.rs index 195db77..e507bbd 100644 --- a/aws-s3-transfer-manager/src/lib.rs +++ b/aws-s3-transfer-manager/src/lib.rs @@ -22,14 +22,19 @@ )] pub(crate) const MEBIBYTE: u64 = 1024 * 1024; + +pub(crate) const DEFAULT_CONCURRENCY: usize = 8; + /// Abstractions for downloading objects from S3 pub mod download; /// Error types emitted by `aws-s3-transfer-manager` pub mod error; +/// Common types used by `aws-s3-transfer-manager` +pub mod types; + /// Types and helpers for I/O -#[allow(unused)] // FIXME(aws-sdk-rust#1159) - remove when consumed internally by other modules pub mod io; /// Abstractions for downloading objects from Amazon S3 diff --git a/aws-s3-transfer-manager/src/types.rs b/aws-s3-transfer-manager/src/types.rs new file mode 100644 index 0000000..266d8d8 --- /dev/null +++ b/aws-s3-transfer-manager/src/types.rs @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/// The target part size for an upload or download request. +#[derive(Debug, Clone, Default)] +pub enum PartSize { + /// Automatically configure an optimal target part size based on the execution environment. + #[default] + Auto, + + /// Target part size explicitly given. + /// + /// NOTE: This is a suggestion and will be used if possible but may be adjusted for an individual request + /// as required by the underlying API. + Target(u64), +} + +/// The concurrency settings to use for a single upload or download request. +#[derive(Debug, Clone, Default)] +pub enum ConcurrencySetting { + /// Automatically configure an optimal concurrency setting based on the execution environment. + #[default] + Auto, + + /// Explicitly configured concurrency setting. + Explicit(usize), +} diff --git a/aws-s3-transfer-manager/src/upload.rs b/aws-s3-transfer-manager/src/upload.rs index aae46a5..e22fee1 100644 --- a/aws-s3-transfer-manager/src/upload.rs +++ b/aws-s3-transfer-manager/src/upload.rs @@ -6,8 +6,480 @@ pub use crate::upload::request::UploadRequest; pub use crate::upload::response::UploadResponse; +use crate::error::UploadError; +use crate::io::part_reader::{Builder as PartReaderBuilder, ReadPart}; +use crate::io::InputStream; +use crate::types::{ConcurrencySetting, PartSize}; +use crate::upload::context::UploadContext; +use crate::upload::handle::UploadHandle; +use crate::upload::response::UploadResponseBuilder; +use crate::{DEFAULT_CONCURRENCY, MEBIBYTE}; +use aws_sdk_s3::types::CompletedPart; +use aws_smithy_types::byte_stream::ByteStream; +use aws_types::SdkConfig; +use bytes::Buf; +use std::cmp; +use std::sync::Arc; +use tracing::Instrument; + +mod handle; + /// Request types for uploads to Amazon S3 pub mod request; +mod context; /// Response types for uploads to Amazon S3 pub mod response; + +/// Minimum upload part size in bytes +const MIN_PART_SIZE_BYTES: u64 = 5 * MEBIBYTE; + +/// Maximum number of parts that a single S3 multipart upload supports +const MAX_PARTS: u64 = 10_000; + +/// Fluent style builder for [Uploader] +#[derive(Debug, Clone, Default)] +pub struct Builder { + multipart_threshold_part_size: PartSize, + target_part_size: PartSize, + concurrency: ConcurrencySetting, + sdk_config: Option, + client: Option, +} + +impl Builder { + fn new() -> Builder { + Builder::default() + } + + /// Minimum object size that should trigger a multipart upload. + /// + /// The minimum part size is 5 MiB, any part size less than that will be rounded up. + /// Default is [PartSize::Auto] + pub fn multipart_threshold(self, threshold: PartSize) -> Self { + let threshold = match threshold { + PartSize::Target(part_size) => { + PartSize::Target(cmp::max(part_size, MIN_PART_SIZE_BYTES)) + } + tps => tps, + }; + + self.set_multipart_threshold(threshold) + } + + /// The target size of each part when using a multipart upload to complete the request. + /// + /// When a request's content length is les than [`multipart_threshold_part_size`], + /// this setting is ignored and a single [`PutObject`] request will be made instead. + /// + /// NOTE: The actual part size used may be larger than the configured part size if + /// the current value would result in more than 10,000 parts for an upload request. + /// + /// Default is [PartSize::Auto] + /// + /// [`multipart_threshold_part_size`]: method@Self::multipart_threshold_part_size + /// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + pub fn part_size(self, part_size: PartSize) -> Self { + let threshold = match part_size { + PartSize::Target(part_size) => { + PartSize::Target(cmp::max(part_size, MIN_PART_SIZE_BYTES)) + } + tps => tps, + }; + + self.set_target_part_size(threshold) + } + + /// Minimum object size that should trigger a multipart upload. + /// + /// NOTE: This does not validate the setting and is meant for internal use only. + pub(crate) fn set_multipart_threshold(mut self, threshold: PartSize) -> Self { + self.multipart_threshold_part_size = threshold; + self + } + + /// Target part size for a multipart upload. + /// + /// NOTE: This does not validate the setting and is meant for internal use only. + pub(crate) fn set_target_part_size(mut self, threshold: PartSize) -> Self { + self.target_part_size = threshold; + self + } + + /// Set the configuration used by the S3 client + pub fn sdk_config(mut self, config: SdkConfig) -> Self { + self.sdk_config = Some(config); + self + } + + /// Set the concurrency level this component is allowed to use. + /// + /// This sets the maximum number of concurrent in-flight requests. + /// Default is [ConcurrencySetting::Auto]. + pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self { + self.concurrency = concurrency; + self + } + + /// Consumes the builder and constructs a [Uploader] + pub fn build(self) -> Uploader { + self.into() + } + + /// Set an explicit client to use. This takes precedence over setting `sdk_config`. + #[cfg(test)] + pub(crate) fn client(mut self, client: aws_sdk_s3::Client) -> Self { + self.client = Some(client); + self + } +} + +impl From for Uploader { + fn from(value: Builder) -> Self { + let client = value.client.unwrap_or_else(|| { + let sdk_config = value + .sdk_config + .unwrap_or_else(|| SdkConfig::builder().build()); + aws_sdk_s3::Client::new(&sdk_config) + }); + + Self { + multipart_threshold_part_size: value.multipart_threshold_part_size, + target_part_size: value.target_part_size, + concurrency: value.concurrency, + client, + } + } +} + +/// Upload an object in the most efficient way possible by splitting the request into +/// concurrent requests (e.g. using multi-part uploads). +#[derive(Debug, Clone)] +pub struct Uploader { + multipart_threshold_part_size: PartSize, + target_part_size: PartSize, + concurrency: ConcurrencySetting, + client: aws_sdk_s3::client::Client, +} + +impl Uploader { + /// Create a new [Builder] + pub fn builder() -> Builder { + Builder::new() + } + + /// Upload a single object to Amazon S3. + /// + /// A single logical request may be split into many concurrent `UploadPart` requests + /// to improve throughput. + /// + /// # Examples + /// + /// ```no_run + /// use std::error::Error; + /// use std::path::Path; + /// use aws_s3_transfer_manager::io::InputStream; + /// use aws_s3_transfer_manager::upload::{Uploader, UploadRequest}; + /// + /// async fn upload_file(client: Uploader, path: impl AsRef) -> Result<(), Box> { + /// let stream = InputStream::from_path(path)?; + /// let request = UploadRequest::builder() + /// .bucket("my-bucket") + /// .key("my-key") + /// .body(stream) + /// .build()?; + /// + /// let handle = client.upload(request).await?; + /// + /// // upload() will return potentially before the request is complete. + /// // The handle given must be joined to drive the request to completion. + /// // It can also be used to get progress, pause or cancel the request, etc. + /// let response = handle.join().await?; + /// // ... do something with response + /// Ok(()) + /// } + /// ``` + pub async fn upload(&self, mut req: UploadRequest) -> Result { + let min_mpu_threshold = self.mpu_threshold_bytes(); + + let stream = req.take_body(); + let ctx = self.new_context(req); + let mut handle = UploadHandle::new(ctx); + + // MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway) + let content_length = stream + .size_hint() + .upper() + .ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?; + + if content_length < min_mpu_threshold { + // TODO - adapt body to ByteStream and send request using `PutObject` for non mpu upload + // tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request"); + self.try_start_mpu_upload(&mut handle, stream, content_length) + .await? + } else { + self.try_start_mpu_upload(&mut handle, stream, content_length) + .await? + } + + Ok(handle) + } + + fn new_context(&self, req: UploadRequest) -> UploadContext { + UploadContext { + client: self.client.clone(), + request: Arc::new(req), + upload_id: None, + } + } + + /// Upload the object via multipart upload + /// + /// # Arguments + /// + /// * `handle` - The upload handle + /// * `stream` - The content to upload + /// * `content_length` - The upper bound on the content length + async fn try_start_mpu_upload( + &self, + handle: &mut UploadHandle, + stream: InputStream, + content_length: u64, + ) -> Result<(), UploadError> { + let part_size = cmp::max(self.part_size_bytes(), content_length / MAX_PARTS); + tracing::trace!("upload request using multipart upload with part size: {part_size} bytes"); + + let mpu = start_mpu(handle).await?; + tracing::trace!( + "multipart upload started with upload id: {:?}", + mpu.upload_id + ); + + handle.set_response(mpu); + + let part_reader = Arc::new( + PartReaderBuilder::new() + .stream(stream) + .part_size(part_size.try_into().expect("valid part size")) + .build(), + ); + + let n_workers = self.num_workers(); + for i in 0..n_workers { + let worker = upload_parts(handle.ctx.clone(), part_reader.clone()) + .instrument(tracing::debug_span!("upload-part", worker = i)); + handle.tasks.spawn(worker); + } + + Ok(()) + } + + /// Get the concrete number of workers to use based on the concurrency setting. + fn num_workers(&self) -> usize { + match self.concurrency { + // TODO(aws-sdk-rust#1159): add logic for determining this + ConcurrencySetting::Auto => DEFAULT_CONCURRENCY, + ConcurrencySetting::Explicit(explicit) => explicit, + } + } + + /// Get the concrete minimum upload size in bytes to use to determine whether multipart uploads + /// are enabled for a given request. + fn mpu_threshold_bytes(&self) -> u64 { + match self.multipart_threshold_part_size { + // TODO(aws-sdk-rust#1159): add logic for determining this + PartSize::Auto => 16 * MEBIBYTE, + PartSize::Target(explicit) => explicit, + } + } + + fn part_size_bytes(&self) -> u64 { + match self.target_part_size { + PartSize::Auto => 8 * MEBIBYTE, + PartSize::Target(explicit) => explicit, + } + } +} + +/// start a new multipart upload by invoking `CreateMultipartUpload` +async fn start_mpu(handle: &UploadHandle) -> Result { + let req = handle.ctx.request(); + let client = handle.ctx.client(); + + let resp = client + .create_multipart_upload() + .set_acl(req.acl.clone()) + .set_bucket(req.bucket.clone()) + .set_cache_control(req.cache_control.clone()) + .set_content_disposition(req.content_disposition.clone()) + .set_content_encoding(req.content_encoding.clone()) + .set_content_language(req.content_language.clone()) + .set_content_type(req.content_type.clone()) + .set_expires(req.expires) + .set_grant_full_control(req.grant_full_control.clone()) + .set_grant_read(req.grant_read.clone()) + .set_grant_read_acp(req.grant_read_acp.clone()) + .set_grant_write_acp(req.grant_write_acp.clone()) + .set_key(req.key.clone()) + .set_metadata(req.metadata.clone()) + .set_server_side_encryption(req.server_side_encryption.clone()) + .set_storage_class(req.storage_class.clone()) + .set_website_redirect_location(req.website_redirect_location.clone()) + .set_sse_customer_algorithm(req.sse_customer_algorithm.clone()) + .set_sse_customer_key(req.sse_customer_key.clone()) + .set_sse_customer_key_md5(req.sse_customer_key_md5.clone()) + .set_ssekms_key_id(req.sse_kms_key_id.clone()) + .set_ssekms_encryption_context(req.sse_kms_encryption_context.clone()) + .set_bucket_key_enabled(req.bucket_key_enabled) + .set_request_payer(req.request_payer.clone()) + .set_tagging(req.tagging.clone()) + .set_object_lock_mode(req.object_lock_mode.clone()) + .set_object_lock_retain_until_date(req.object_lock_retain_until_date) + .set_object_lock_legal_hold_status(req.object_lock_legal_hold_status.clone()) + .set_expected_bucket_owner(req.expected_bucket_owner.clone()) + .set_checksum_algorithm(req.checksum_algorithm.clone()) + .send() + .await?; + + Ok(resp.into()) +} + +/// Worker function that pulls part data off the `reader` and uploads each part until the reader +/// is exhausted. If any part fails the worker will return the error and stop processing. If +/// the worker finishes successfully the completed parts uploaded by this worker are returned. +async fn upload_parts( + ctx: UploadContext, + reader: Arc, +) -> Result, UploadError> { + let mut completed_parts = Vec::new(); + loop { + let part_result = reader.next_part().await?; + let part_data = match part_result { + Some(part_data) => part_data, + None => break, + }; + + let part_number = part_data.part_number as i32; + tracing::trace!("recv'd part number {}", part_number); + + let content_length = part_data.data.remaining(); + let body = ByteStream::from(part_data.data); + + // TODO(aws-sdk-rust#1159): disable payload signing + // TODO(aws-sdk-rust#1159): set checksum fields if applicable + let resp = ctx + .client + .upload_part() + .set_bucket(ctx.request.bucket.clone()) + .set_key(ctx.request.key.clone()) + .set_upload_id(ctx.upload_id.clone()) + .part_number(part_number) + .content_length(content_length as i64) + .body(body) + .set_sse_customer_algorithm(ctx.request.sse_customer_algorithm.clone()) + .set_sse_customer_key(ctx.request.sse_customer_key.clone()) + .set_sse_customer_key_md5(ctx.request.sse_customer_key_md5.clone()) + .set_request_payer(ctx.request.request_payer.clone()) + .set_expected_bucket_owner(ctx.request.expected_bucket_owner.clone()) + .send() + .await?; + + tracing::trace!("completed upload of part number {}", part_number); + let completed = CompletedPart::builder() + .part_number(part_number) + .set_e_tag(resp.e_tag.clone()) + .set_checksum_crc32(resp.checksum_crc32.clone()) + .set_checksum_crc32_c(resp.checksum_crc32_c.clone()) + .set_checksum_sha1(resp.checksum_sha1.clone()) + .set_checksum_sha256(resp.checksum_sha256.clone()) + .build(); + + completed_parts.push(completed); + } + + tracing::trace!("no more parts, worker finished"); + Ok(completed_parts) +} + +#[cfg(test)] +mod test { + use crate::io::InputStream; + use crate::types::{ConcurrencySetting, PartSize}; + use crate::upload::{UploadRequest, Uploader}; + use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput; + use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput; + use aws_sdk_s3::operation::upload_part::UploadPartOutput; + use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; + use bytes::Bytes; + use std::ops::Deref; + use std::sync::Arc; + + #[tokio::test] + async fn test_basic_mpu() { + let expected_upload_id = Arc::new("test-upload".to_owned()); + let body = Bytes::from_static(b"every adolescent dog goes bonkers early"); + let stream = InputStream::from(body); + + let upload_id = expected_upload_id.clone(); + let create_mpu = + mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || { + CreateMultipartUploadOutput::builder() + .upload_id(upload_id.as_ref().to_owned()) + .build() + }); + + let upload_id = expected_upload_id.clone(); + let upload_1 = mock!(aws_sdk_s3::Client::upload_part) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(30) + }) + .then_output(|| UploadPartOutput::builder().build()); + + let upload_id = expected_upload_id.clone(); + let upload_2 = mock!(aws_sdk_s3::Client::upload_part) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(9) + }) + .then_output(|| UploadPartOutput::builder().build()); + + let expected_e_tag = Arc::new("test-e-tag".to_owned()); + let upload_id = expected_upload_id.clone(); + let e_tag = expected_e_tag.clone(); + let complete_mpu = mock!(aws_sdk_s3::Client::complete_multipart_upload) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) + && r.multipart_upload.clone().unwrap().parts.unwrap().len() == 2 + }) + .then_output(move || { + CompleteMultipartUploadOutput::builder() + .e_tag(e_tag.as_ref().to_owned()) + .build() + }); + + let client = mock_client!( + aws_sdk_s3, + RuleMode::Sequential, + &[&create_mpu, &upload_1, &upload_2, &complete_mpu] + ); + + let uploader = Uploader::builder() + .concurrency(ConcurrencySetting::Explicit(1)) + .set_multipart_threshold(PartSize::Target(10)) + .set_target_part_size(PartSize::Target(30)) + .client(client) + .build(); + + let request = UploadRequest::builder() + .bucket("test-bucket") + .key("test-key") + .body(stream) + .build() + .unwrap(); + + let handle = uploader.upload(request).await.unwrap(); + + let resp = handle.join().await.unwrap(); + assert_eq!(expected_upload_id.deref(), resp.upload_id.unwrap().deref()); + assert_eq!(expected_e_tag.deref(), resp.e_tag.unwrap().deref()); + } +} diff --git a/aws-s3-transfer-manager/src/upload/context.rs b/aws-s3-transfer-manager/src/upload/context.rs new file mode 100644 index 0000000..6db8880 --- /dev/null +++ b/aws-s3-transfer-manager/src/upload/context.rs @@ -0,0 +1,41 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::upload::UploadRequest; +use std::ops::Deref; +use std::sync::Arc; + +/// Internal context used to drive a single Upload request +#[derive(Debug, Clone)] +pub(crate) struct UploadContext { + /// client used for SDK operations + pub(crate) client: aws_sdk_s3::Client, + /// the multipart upload ID + pub(crate) upload_id: Option, + /// the original request (NOTE: the body will have been taken for processing, only the other fields remain) + pub(crate) request: Arc, +} + +impl UploadContext { + /// The S3 client to use for SDK operations + pub(crate) fn client(&self) -> &aws_sdk_s3::Client { + &self.client + } + + /// The original request (sans the body as it will have been taken for processing) + pub(crate) fn request(&self) -> &UploadRequest { + self.request.deref() + } + + /// Set the upload ID if the transfer will be done using a multipart upload + pub(crate) fn set_upload_id(&mut self, upload_id: String) { + self.upload_id = Some(upload_id) + } + + /// Check if this transfer is using multipart upload + pub(crate) fn is_multipart_upload(&self) -> bool { + self.upload_id.is_some() + } +} diff --git a/aws-s3-transfer-manager/src/upload/handle.rs b/aws-s3-transfer-manager/src/upload/handle.rs new file mode 100644 index 0000000..3c3b50f --- /dev/null +++ b/aws-s3-transfer-manager/src/upload/handle.rs @@ -0,0 +1,202 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::error::UploadError; +use crate::upload::context::UploadContext; +use crate::upload::request::FailedMultipartUploadPolicy; +use crate::upload::response::UploadResponseBuilder; +use crate::upload::UploadResponse; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; +use tokio::task; + +/// Response type for a single upload object request. +#[derive(Debug)] +#[non_exhaustive] +pub struct UploadHandle { + /// All child multipart upload tasks spawned for this upload + pub(crate) tasks: task::JoinSet, UploadError>>, + /// The context used to drive an upload to completion + pub(crate) ctx: UploadContext, + /// The response that will eventually be yielded to the caller. + response: Option, +} + +impl UploadHandle { + /// Create a new upload handle with the given request context + pub(crate) fn new(ctx: UploadContext) -> Self { + Self { + tasks: task::JoinSet::new(), + ctx, + response: None, + } + } + + /// Set the initial response builder once available + /// + /// This is usually after `CreateMultipartUpload` is initiated (or + /// `PutObject` is invoked for uploads less than the required MPU threshold). + pub(crate) fn set_response(&mut self, builder: UploadResponseBuilder) { + if builder.upload_id.is_some() { + let upload_id = builder.upload_id.clone().expect("upload ID present"); + self.ctx.set_upload_id(upload_id); + } + + self.response = Some(builder); + } + + /// Consume the handle and wait for upload to complete + pub async fn join(self) -> Result { + complete_upload(self).await + } + + /// Abort the upload and cancel any in-progress part uploads. + pub async fn abort(&mut self) -> Result { + // TODO(aws-sdk-rust#1159) - handle already completed upload + + // cancel in-progress uploads + self.tasks.abort_all(); + + // join all tasks + while (self.tasks.join_next().await).is_some() {} + + if !self.ctx.is_multipart_upload() { + return Ok(AbortedUpload::default()); + } + + let abort_policy = self + .ctx + .request + .failed_multipart_upload_policy + .clone() + .unwrap_or_default(); + + match abort_policy { + FailedMultipartUploadPolicy::AbortUpload => abort_upload(self).await, + FailedMultipartUploadPolicy::Retain => Ok(AbortedUpload::default()), + } + } +} + +/// Describes the result of aborting an in-progress upload. +#[derive(Debug, Default)] +pub struct AbortedUpload { + upload_id: Option, + request_charged: Option, +} + +impl AbortedUpload { + /// Get the multipart upload ID that was cancelled + /// + /// Not present for uploads that did not utilize a multipart upload + pub fn upload_id(&self) -> &Option { + &self.upload_id + } + + /// If present, indicates that the requester was successfully charged for the request. + /// + /// This functionality is not supported for directory buckets and is + /// not present for uploads that did not utilize a multipart upload + pub fn request_charged(&self) -> &Option { + &self.request_charged + } +} + +async fn abort_upload(handle: &UploadHandle) -> Result { + let abort_mpu_resp = handle + .ctx + .client + .abort_multipart_upload() + .set_bucket(handle.ctx.request.bucket.clone()) + .set_key(handle.ctx.request.key.clone()) + .set_upload_id(handle.ctx.upload_id.clone()) + .set_request_payer(handle.ctx.request.request_payer.clone()) + .set_expected_bucket_owner(handle.ctx.request.expected_bucket_owner.clone()) + .send() + .await?; + + let aborted_upload = AbortedUpload { + upload_id: handle.ctx.upload_id.clone(), + request_charged: abort_mpu_resp.request_charged, + }; + + Ok(aborted_upload) +} + +async fn complete_upload(mut handle: UploadHandle) -> Result { + if !handle.ctx.is_multipart_upload() { + todo!("non mpu upload not implemented yet") + } + + tracing::trace!("joining upload_id={:?}", handle.ctx.upload_id); + + let mut all_parts = Vec::new(); + + // join all the upload tasks + while let Some(join_result) = handle.tasks.join_next().await { + let result = join_result.expect("task completed"); + match result { + Ok(mut completed_parts) => { + all_parts.append(&mut completed_parts); + } + // TODO(aws-sdk-rust#1159, design) - do we want to return first error or collect all errors? + Err(err) => { + tracing::error!("multipart upload failed, aborting"); + // TODO(aws-sdk-rust#1159) - if cancelling causes an error we want to propagate that in the returned error somehow? + handle.abort().await?; + return Err(err); + } + } + } + + tracing::trace!( + "completing multipart upload: upload_id={:?}", + handle.ctx.upload_id + ); + + // parts must be sorted + all_parts.sort_by_key(|p| p.part_number.expect("part number set")); + + // complete the multipart upload + let complete_mpu_resp = handle + .ctx + .client + .complete_multipart_upload() + .set_bucket(handle.ctx.request.bucket.clone()) + .set_key(handle.ctx.request.key.clone()) + .set_upload_id(handle.ctx.upload_id.clone()) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(all_parts)) + .build(), + ) + // TODO(aws-sdk-rust#1159) - implement checksums + // .set_checksum_crc32() + // .set_checksum_crc32_c() + // .set_checksum_sha1() + // .set_checksum_sha256() + .set_request_payer(handle.ctx.request.request_payer.clone()) + .set_expected_bucket_owner(handle.ctx.request.expected_bucket_owner.clone()) + .set_sse_customer_algorithm(handle.ctx.request.sse_customer_algorithm.clone()) + .set_sse_customer_key(handle.ctx.request.sse_customer_key.clone()) + .set_sse_customer_key_md5(handle.ctx.request.sse_customer_key_md5.clone()) + .send() + .await?; + + // set remaining fields from completing the multipart upload + let resp = handle + .response + .take() + .expect("response set") + .set_e_tag(complete_mpu_resp.e_tag.clone()) + .set_expiration(complete_mpu_resp.expiration.clone()) + .set_version_id(complete_mpu_resp.version_id.clone()); + + tracing::trace!( + "upload completed successfully: upload_id={:?}", + handle.ctx.upload_id + ); + + Ok(resp.build().expect("valid response")) +} diff --git a/aws-s3-transfer-manager/src/upload/request.rs b/aws-s3-transfer-manager/src/upload/request.rs index 261fcc1..ddb89b2 100644 --- a/aws-s3-transfer-manager/src/upload/request.rs +++ b/aws-s3-transfer-manager/src/upload/request.rs @@ -184,6 +184,9 @@ pub struct UploadRequest { pub object_lock_legal_hold_status: Option, ///

The account ID of the expected bucket owner. If the account ID that you provide does not match the actual owner of the bucket, the request fails with the HTTP status code 403 Forbidden (access denied).

pub expected_bucket_owner: Option, + + /// The policy that describes how to handle a failed multipart upload. + pub failed_multipart_upload_policy: Option, } impl UploadRequest { @@ -193,7 +196,6 @@ impl UploadRequest { } /// Split the body from the request by taking it and replacing it with the default. - #[allow(dead_code)] // FIXME(aws-sdk-rust#1159) - remove when consumed internally by other modules pub(crate) fn take_body(&mut self) -> InputStream { mem::take(&mut self.body) } @@ -448,6 +450,11 @@ impl UploadRequest { pub fn expected_bucket_owner(&self) -> Option<&str> { self.expected_bucket_owner.as_deref() } + + /// The policy that describes how to handle a failed multipart upload. + pub fn failed_multipart_upload_policy(&self) -> Option<&FailedMultipartUploadPolicy> { + self.failed_multipart_upload_policy.as_ref() + } } impl Debug for UploadRequest { @@ -502,6 +509,10 @@ impl Debug for UploadRequest { &self.object_lock_legal_hold_status(), ); formatter.field("expected_bucket_owner", &self.expected_bucket_owner); + formatter.field( + "failed_multipart_upload_policy", + &self.failed_multipart_upload_policy, + ); formatter.finish() } } @@ -547,6 +558,7 @@ pub struct UploadRequestBuilder { pub(crate) object_lock_retain_until_date: Option<::aws_smithy_types::DateTime>, pub(crate) object_lock_legal_hold_status: Option, pub(crate) expected_bucket_owner: Option, + pub(crate) failed_multipart_upload_policy: Option, } impl UploadRequestBuilder { @@ -1405,6 +1417,26 @@ impl UploadRequestBuilder { &self.expected_bucket_owner } + /// The policy that describes how to handle a failed multipart upload. + pub fn failed_multipart_upload_policy(mut self, input: FailedMultipartUploadPolicy) -> Self { + self.failed_multipart_upload_policy = Some(input); + self + } + + /// The policy that describes how to handle a failed multipart upload. + pub fn set_failed_multipart_upload_policy( + mut self, + input: Option, + ) -> Self { + self.failed_multipart_upload_policy = input; + self + } + + /// The policy that describes how to handle a failed multipart upload. + pub fn get_failed_multipart_upload_policy(&self) -> &Option { + &self.failed_multipart_upload_policy + } + /// Consumes the builder and constructs a [`UploadRequest`] // FIXME(aws-sdk-rust#1159): replace BuildError with our own type? pub fn build(self) -> Result { @@ -1446,6 +1478,7 @@ impl UploadRequestBuilder { object_lock_retain_until_date: self.object_lock_retain_until_date, object_lock_legal_hold_status: self.object_lock_legal_hold_status, expected_bucket_owner: self.expected_bucket_owner, + failed_multipart_upload_policy: self.failed_multipart_upload_policy, }) } } @@ -1499,6 +1532,22 @@ impl Debug for UploadRequestBuilder { &self.object_lock_legal_hold_status, ); formatter.field("expected_bucket_owner", &self.expected_bucket_owner); + formatter.field( + "failed_multipart_upload_policy", + &self.failed_multipart_upload_policy, + ); formatter.finish() } } + +/// Policy for how to handle a failed multipart upload +/// +/// Default is to abort the upload. +#[derive(Debug, Clone, Default)] +pub enum FailedMultipartUploadPolicy { + /// Abort the upload on any individual part failure + #[default] + AbortUpload, + /// Retain any uploaded parts. The upload ID will be available in the response. + Retain, +}