diff --git a/aws-s3-transfer-manager/examples/cp.rs b/aws-s3-transfer-manager/examples/cp.rs index e0861c5..fc4b96b 100644 --- a/aws-s3-transfer-manager/examples/cp.rs +++ b/aws-s3-transfer-manager/examples/cp.rs @@ -7,12 +7,10 @@ use std::path::PathBuf; use std::str::FromStr; use std::{mem, time}; -use aws_s3_transfer_manager::download::Downloader; - -use aws_s3_transfer_manager::download::body::Body; +use aws_s3_transfer_manager::client::downloader::body::Body; +use aws_s3_transfer_manager::client::Downloader; use aws_s3_transfer_manager::io::InputStream; use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize}; -use aws_s3_transfer_manager::upload::{UploadRequest, Uploader}; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; use aws_types::SdkConfig; use bytes::Buf; @@ -161,28 +159,33 @@ async fn do_upload(args: Args) -> Result<(), BoxError> { let config = aws_config::from_env().load().await; warmup(&config).await?; - let tm = Uploader::builder() - .sdk_config(config) + let s3_client = aws_sdk_s3::Client::new(&config); + + let tm_config = aws_s3_transfer_manager::Config::builder() .concurrency(ConcurrencySetting::Explicit(args.concurrency)) .part_size(PartSize::Target(args.part_size)) + .client(s3_client) .build(); + let tm = aws_s3_transfer_manager::Client::new(tm_config); + let path = args.source.expect_local(); let file_meta = fs::metadata(path).await.expect("file metadata"); let stream = InputStream::from_path(path)?; let (bucket, key) = args.dest.expect_s3().parts(); - let request = UploadRequest::builder() + println!("starting upload"); + let start = time::Instant::now(); + + let handle = tm + .upload() .bucket(bucket) .key(key) .body(stream) - .build()?; - - println!("starting upload"); - let start = time::Instant::now(); + .send() + .await?; - let handle = tm.upload(request).await?; let _resp = handle.join().await?; let elapsed = start.elapsed(); diff --git a/aws-s3-transfer-manager/src/client.rs b/aws-s3-transfer-manager/src/client.rs new file mode 100644 index 0000000..312d7c7 --- /dev/null +++ b/aws-s3-transfer-manager/src/client.rs @@ -0,0 +1,104 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/// Abstractions for downloading objects from S3 +pub mod downloader; +pub use downloader::Downloader; + +use crate::Config; +use crate::{ + types::{ConcurrencySetting, PartSize}, + DEFAULT_CONCURRENCY, MEBIBYTE, +}; +use std::sync::Arc; + +/// Transfer manager client for Amazon Simple Storage Service. +#[derive(Debug, Clone)] +pub struct Client { + handle: Arc, +} + +/// Whatever is needed to carry out operations, e.g. scheduler, budgets, config, env details, etc +#[derive(Debug)] +pub(crate) struct Handle { + pub(crate) config: crate::Config, +} + +impl Handle { + /// Get the concrete number of workers to use based on the concurrency setting. + pub(crate) fn num_workers(&self) -> usize { + match self.config.concurrency() { + // TODO(aws-sdk-rust#1159): add logic for determining this + ConcurrencySetting::Auto => DEFAULT_CONCURRENCY, + ConcurrencySetting::Explicit(explicit) => *explicit, + } + } + + /// Get the concrete minimum upload size in bytes to use to determine whether multipart uploads + /// are enabled for a given request. + pub(crate) fn mpu_threshold_bytes(&self) -> u64 { + match self.config.multipart_threshold() { + PartSize::Auto => 16 * MEBIBYTE, + PartSize::Target(explicit) => *explicit, + } + } + + /// Get the concrete target part size to use for uploads + pub(crate) fn upload_part_size_bytes(&self) -> u64 { + match self.config.part_size() { + PartSize::Auto => 8 * MEBIBYTE, + PartSize::Target(explicit) => *explicit, + } + } +} + +impl Client { + /// Creates a new client from a transfer manager config. + pub fn new(config: Config) -> Client { + let handle = Arc::new(Handle { config }); + + Client { handle } + } + + /// Returns the client's configuration + pub fn config(&self) -> &Config { + &self.handle.config + } + + /// Constructs a fluent builder for the + /// [`Upload`](crate::operation::upload::builders::UploadFluentBuilder) operation. + /// + /// # Examples + /// + /// ```no_run + /// use std::error::Error; + /// use std::path::Path; + /// use aws_s3_transfer_manager::io::InputStream; + /// + /// async fn upload_file( + /// client: &aws_s3_transfer_manager::Client, + /// path: impl AsRef + /// ) -> Result<(), Box> { + /// let stream = InputStream::from_path(path)?; + /// let handle = client.upload() + /// .bucket("my-bucket") + /// .key("my_key") + /// .body(stream) + /// .send() + /// .await?; + /// + /// // send() may return before the transfer is complete. + /// // Call the `join()` method on the returned handle to drive the transfer to completion. + /// // The handle can also be used to get progress, pause, or cancel the transfer, etc. + /// let response = handle.join().await?; + /// // ... do something with response + /// Ok(()) + /// } + /// + /// ``` + pub fn upload(&self) -> crate::operation::upload::builders::UploadFluentBuilder { + crate::operation::upload::builders::UploadFluentBuilder::new(self.handle.clone()) + } +} diff --git a/aws-s3-transfer-manager/src/download.rs b/aws-s3-transfer-manager/src/client/downloader.rs similarity index 86% rename from aws-s3-transfer-manager/src/download.rs rename to aws-s3-transfer-manager/src/client/downloader.rs index feef502..3cb08e0 100644 --- a/aws-s3-transfer-manager/src/download.rs +++ b/aws-s3-transfer-manager/src/client/downloader.rs @@ -12,14 +12,14 @@ mod header; mod object_meta; mod worker; -use crate::download::body::Body; -use crate::download::discovery::{discover_obj, ObjectDiscovery}; -use crate::download::handle::DownloadHandle; -use crate::download::worker::{distribute_work, download_chunks, ChunkResponse}; +use crate::client::downloader::body::Body; +use crate::client::downloader::discovery::{discover_obj, ObjectDiscovery}; +use crate::client::downloader::handle::DownloadHandle; +use crate::client::downloader::worker::{distribute_work, download_chunks, ChunkResponse}; use crate::error::TransferError; +use crate::operation::download::DownloadInput; use crate::types::{ConcurrencySetting, PartSize}; use crate::{DEFAULT_CONCURRENCY, MEBIBYTE}; -use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder}; use aws_types::SdkConfig; use context::DownloadContext; use tokio::sync::mpsc; @@ -28,28 +28,6 @@ use tracing::Instrument; // TODO(aws-sdk-rust#1159) - need to set User-Agent header value for SEP, e.g. `ft/hll#s3-transfer` -/// Request type for downloading a single object -#[derive(Debug)] -#[non_exhaustive] -pub struct DownloadRequest { - pub(crate) input: GetObjectInputBuilder, -} - -// FIXME - should probably be TryFrom since checksums may conflict? -impl From for DownloadRequest { - fn from(value: GetObjectFluentBuilder) -> Self { - Self { - input: value.as_input().clone(), - } - } -} - -impl From for DownloadRequest { - fn from(value: GetObjectInputBuilder) -> Self { - Self { input: value } - } -} - /// Fluent style builder for [Downloader] #[derive(Debug, Clone, Default)] pub struct Builder { @@ -131,7 +109,8 @@ impl Downloader { /// ```no_run /// use std::error::Error; /// use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; - /// use aws_s3_transfer_manager::download::{Downloader, DownloadRequest}; + /// use aws_s3_transfer_manager::client::Downloader; + /// use aws_s3_transfer_manager::operation::download::DownloadInput; /// /// async fn get_object(client: Downloader) -> Result<(), Box> { /// let request = GetObjectInputBuilder::default() @@ -144,7 +123,7 @@ impl Downloader { /// Ok(()) /// } /// ``` - pub async fn download(&self, req: DownloadRequest) -> Result { + pub async fn download(&self, req: DownloadInput) -> Result { // if there is a part number then just send the default request if req.input.get_part_number().is_some() { todo!("single part download not implemented") diff --git a/aws-s3-transfer-manager/src/download/body.rs b/aws-s3-transfer-manager/src/client/downloader/body.rs similarity index 98% rename from aws-s3-transfer-manager/src/download/body.rs rename to aws-s3-transfer-manager/src/client/downloader/body.rs index f759be4..762c0fa 100644 --- a/aws-s3-transfer-manager/src/download/body.rs +++ b/aws-s3-transfer-manager/src/client/downloader/body.rs @@ -2,7 +2,7 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ -use crate::download::worker::ChunkResponse; +use crate::client::downloader::worker::ChunkResponse; use crate::error::TransferError; use aws_smithy_types::byte_stream::AggregatedBytes; use std::cmp; @@ -170,7 +170,7 @@ impl UnorderedBody { #[cfg(test)] mod tests { - use crate::download::worker::ChunkResponse; + use crate::client::downloader::worker::ChunkResponse; use crate::error::TransferError; use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream}; use bytes::Bytes; diff --git a/aws-s3-transfer-manager/src/download/context.rs b/aws-s3-transfer-manager/src/client/downloader/context.rs similarity index 100% rename from aws-s3-transfer-manager/src/download/context.rs rename to aws-s3-transfer-manager/src/client/downloader/context.rs diff --git a/aws-s3-transfer-manager/src/download/discovery.rs b/aws-s3-transfer-manager/src/client/downloader/discovery.rs similarity index 96% rename from aws-s3-transfer-manager/src/download/discovery.rs rename to aws-s3-transfer-manager/src/client/downloader/discovery.rs index 8b0ac83..2220bf2 100644 --- a/aws-s3-transfer-manager/src/download/discovery.rs +++ b/aws-s3-transfer-manager/src/client/downloader/discovery.rs @@ -16,8 +16,8 @@ use crate::error; use super::header::{self, ByteRange}; use super::object_meta::ObjectMetadata; -use super::DownloadRequest; -use crate::download::context::DownloadContext; +use super::DownloadInput; +use crate::client::downloader::context::DownloadContext; #[derive(Debug, Clone, PartialEq)] enum ObjectDiscoveryStrategy { @@ -44,7 +44,7 @@ pub(super) struct ObjectDiscovery { impl ObjectDiscoveryStrategy { fn from_request( - request: &DownloadRequest, + request: &DownloadInput, ) -> Result { let strategy = match request.input.get_range() { Some(h) => { @@ -71,7 +71,7 @@ impl ObjectDiscoveryStrategy { /// to be fetched, and _(if available)_ the first chunk of data. pub(super) async fn discover_obj( ctx: &DownloadContext, - request: &DownloadRequest, + request: &DownloadInput, ) -> Result { let strategy = ObjectDiscoveryStrategy::from_request(request)?; match strategy { @@ -99,7 +99,7 @@ pub(super) async fn discover_obj( async fn discover_obj_with_head( ctx: &DownloadContext, - request: &DownloadRequest, + request: &DownloadInput, byte_range: Option, ) -> Result { let meta: ObjectMetadata = ctx @@ -166,11 +166,11 @@ async fn discover_obj_with_get( #[cfg(test)] mod tests { - use crate::download::context::DownloadContext; - use crate::download::discovery::{ + use crate::client::downloader::context::DownloadContext; + use crate::client::downloader::discovery::{ discover_obj, discover_obj_with_head, ObjectDiscoveryStrategy, }; - use crate::download::header::ByteRange; + use crate::client::downloader::header::ByteRange; use crate::MEBIBYTE; use aws_sdk_s3::operation::get_object::{GetObjectInput, GetObjectOutput}; use aws_sdk_s3::operation::head_object::HeadObjectOutput; diff --git a/aws-s3-transfer-manager/src/download/handle.rs b/aws-s3-transfer-manager/src/client/downloader/handle.rs similarity index 86% rename from aws-s3-transfer-manager/src/download/handle.rs rename to aws-s3-transfer-manager/src/client/downloader/handle.rs index 19b03f6..500ea81 100644 --- a/aws-s3-transfer-manager/src/download/handle.rs +++ b/aws-s3-transfer-manager/src/client/downloader/handle.rs @@ -2,8 +2,8 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ -use crate::download::body::Body; -use crate::download::object_meta::ObjectMetadata; +use crate::client::downloader::body::Body; +use crate::client::downloader::object_meta::ObjectMetadata; use tokio::task; /// Response type for a single download object request. diff --git a/aws-s3-transfer-manager/src/download/header.rs b/aws-s3-transfer-manager/src/client/downloader/header.rs similarity index 100% rename from aws-s3-transfer-manager/src/download/header.rs rename to aws-s3-transfer-manager/src/client/downloader/header.rs diff --git a/aws-s3-transfer-manager/src/download/object_meta.rs b/aws-s3-transfer-manager/src/client/downloader/object_meta.rs similarity index 100% rename from aws-s3-transfer-manager/src/download/object_meta.rs rename to aws-s3-transfer-manager/src/client/downloader/object_meta.rs diff --git a/aws-s3-transfer-manager/src/download/worker.rs b/aws-s3-transfer-manager/src/client/downloader/worker.rs similarity index 96% rename from aws-s3-transfer-manager/src/download/worker.rs rename to aws-s3-transfer-manager/src/client/downloader/worker.rs index edd8149..42da623 100644 --- a/aws-s3-transfer-manager/src/download/worker.rs +++ b/aws-s3-transfer-manager/src/client/downloader/worker.rs @@ -2,8 +2,8 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ -use crate::download::context::DownloadContext; -use crate::download::header; +use crate::client::downloader::context::DownloadContext; +use crate::client::downloader::header; use crate::error; use crate::error::TransferError; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; @@ -137,8 +137,8 @@ fn next_chunk( #[cfg(test)] mod tests { - use crate::download::header; - use crate::download::worker::distribute_work; + use crate::client::downloader::header; + use crate::client::downloader::worker::distribute_work; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; use std::ops::RangeInclusive; diff --git a/aws-s3-transfer-manager/src/config.rs b/aws-s3-transfer-manager/src/config.rs new file mode 100644 index 0000000..e375bdf --- /dev/null +++ b/aws-s3-transfer-manager/src/config.rs @@ -0,0 +1,143 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::types::{ConcurrencySetting, PartSize}; +use crate::MEBIBYTE; +use std::cmp; + +/// Minimum upload part size in bytes +const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * MEBIBYTE; + +/// Configuration for a [`Client`](crate::client::Client) +#[derive(Debug, Clone)] +pub struct Config { + multipart_threshold: PartSize, + target_part_size: PartSize, + concurrency: ConcurrencySetting, + client: aws_sdk_s3::client::Client, +} + +impl Config { + /// Create a new `Config` builder + pub fn builder() -> Builder { + Builder::default() + } + + /// Returns a reference to the multipart upload threshold part size + pub fn multipart_threshold(&self) -> &PartSize { + &self.multipart_threshold + } + + /// Returns a reference to the target part size to use for transfer operations + pub fn part_size(&self) -> &PartSize { + &self.target_part_size + } + + // TODO(design) - should we separate upload/download part size and concurrency settings? + // + // FIXME - this setting is wrong, we don't use it right. This should feed into scheduling and + // whether an individual operation can execute an SDK/HTTP request. We should be free to spin + // however many tasks we want per transfer operation OR have separate config for task + // concurrency. + /// Returns the concurrency setting to use for individual transfer operations. + pub fn concurrency(&self) -> &ConcurrencySetting { + &self.concurrency + } + + /// The Amazon S3 client instance that will be used to send requests to S3. + pub fn client(&self) -> &aws_sdk_s3::Client { + &self.client + } +} + +/// Fluent style builder for [Config] +#[derive(Debug, Clone, Default)] +pub struct Builder { + multipart_threshold_part_size: PartSize, + target_part_size: PartSize, + concurrency: ConcurrencySetting, + client: Option, +} + +impl Builder { + /// Minimum object size that should trigger a multipart upload. + /// + /// The minimum part size is 5 MiB, any part size less than that will be rounded up. + /// Default is [PartSize::Auto] + pub fn multipart_threshold(self, threshold: PartSize) -> Self { + let threshold = match threshold { + PartSize::Target(part_size) => { + PartSize::Target(cmp::max(part_size, MIN_MULTIPART_PART_SIZE_BYTES)) + } + tps => tps, + }; + + self.set_multipart_threshold(threshold) + } + + /// The target size of each part when using a multipart upload to complete the request. + /// + /// When a request's content length is les than [`multipart_threshold`], + /// this setting is ignored and a single [`PutObject`] request will be made instead. + /// + /// NOTE: The actual part size used may be larger than the configured part size if + /// the current value would result in more than 10,000 parts for an upload request. + /// + /// Default is [PartSize::Auto] + /// + /// [`multipart_threshold`]: method@Self::multipart_threshold + /// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + pub fn part_size(self, part_size: PartSize) -> Self { + let threshold = match part_size { + PartSize::Target(part_size) => { + PartSize::Target(cmp::max(part_size, MIN_MULTIPART_PART_SIZE_BYTES)) + } + tps => tps, + }; + + self.set_target_part_size(threshold) + } + + /// Minimum object size that should trigger a multipart upload. + /// + /// NOTE: This does not validate the setting and is meant for internal use only. + pub(crate) fn set_multipart_threshold(mut self, threshold: PartSize) -> Self { + self.multipart_threshold_part_size = threshold; + self + } + + /// Target part size for a multipart upload. + /// + /// NOTE: This does not validate the setting and is meant for internal use only. + pub(crate) fn set_target_part_size(mut self, threshold: PartSize) -> Self { + self.target_part_size = threshold; + self + } + + /// Set the concurrency level this component is allowed to use. + /// + /// This sets the maximum number of concurrent in-flight requests. + /// Default is [ConcurrencySetting::Auto]. + pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self { + self.concurrency = concurrency; + self + } + + /// Consumes the builder and constructs a [`Config`](crate::config::Config) + pub fn build(self) -> Config { + Config { + multipart_threshold: self.multipart_threshold_part_size, + target_part_size: self.target_part_size, + concurrency: self.concurrency, + client: self.client.expect("client set"), + } + } + + /// Set an explicit S3 client to use. + pub fn client(mut self, client: aws_sdk_s3::Client) -> Self { + self.client = Some(client); + self + } +} diff --git a/aws-s3-transfer-manager/src/lib.rs b/aws-s3-transfer-manager/src/lib.rs index e507bbd..9846a23 100644 --- a/aws-s3-transfer-manager/src/lib.rs +++ b/aws-s3-transfer-manager/src/lib.rs @@ -25,9 +25,6 @@ pub(crate) const MEBIBYTE: u64 = 1024 * 1024; pub(crate) const DEFAULT_CONCURRENCY: usize = 8; -/// Abstractions for downloading objects from S3 -pub mod download; - /// Error types emitted by `aws-s3-transfer-manager` pub mod error; @@ -37,5 +34,14 @@ pub mod types; /// Types and helpers for I/O pub mod io; -/// Abstractions for downloading objects from Amazon S3 -pub mod upload; +/// Transfer manager client +pub mod client; + +/// Transfer manager operations +pub mod operation; + +/// Transfer manager configuration +pub mod config; + +pub use self::client::Client; +pub use self::config::Config; diff --git a/aws-s3-transfer-manager/src/operation.rs b/aws-s3-transfer-manager/src/operation.rs new file mode 100644 index 0000000..ca0a166 --- /dev/null +++ b/aws-s3-transfer-manager/src/operation.rs @@ -0,0 +1,10 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/// Types for single object upload operation +pub mod upload; + +/// Types for single object download operation +pub mod download; diff --git a/aws-s3-transfer-manager/src/operation/download.rs b/aws-s3-transfer-manager/src/operation/download.rs new file mode 100644 index 0000000..b42708b --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/download.rs @@ -0,0 +1,11 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +mod input; + +/// Request type for uploads to Amazon S3 +pub use self::input::DownloadInput; + +mod output; diff --git a/aws-s3-transfer-manager/src/operation/download/input.rs b/aws-s3-transfer-manager/src/operation/download/input.rs new file mode 100644 index 0000000..831f29e --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/download/input.rs @@ -0,0 +1,30 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder}; + +// FIXME - replace with our own types like upload + +/// Request type for downloading a single object +#[derive(Debug)] +#[non_exhaustive] +pub struct DownloadInput { + pub(crate) input: GetObjectInputBuilder, +} + +// FIXME - should probably be TryFrom since checksums may conflict? +impl From for DownloadInput { + fn from(value: GetObjectFluentBuilder) -> Self { + Self { + input: value.as_input().clone(), + } + } +} + +impl From for DownloadInput { + fn from(value: GetObjectInputBuilder) -> Self { + Self { input: value } + } +} diff --git a/aws-s3-transfer-manager/src/operation/download/output.rs b/aws-s3-transfer-manager/src/operation/download/output.rs new file mode 100644 index 0000000..35d0f36 --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/download/output.rs @@ -0,0 +1,4 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ diff --git a/aws-s3-transfer-manager/src/operation/upload.rs b/aws-s3-transfer-manager/src/operation/upload.rs new file mode 100644 index 0000000..6fc909c --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/upload.rs @@ -0,0 +1,299 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/// Operation builders +pub mod builders; +mod input; +mod output; + +pub(crate) mod context; +pub(crate) mod handle; + +use crate::error::UploadError; +use crate::io::part_reader::{Builder as PartReaderBuilder, ReadPart}; +use crate::io::InputStream; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::types::CompletedPart; +use bytes::Buf; +use context::UploadContext; +pub use handle::UploadHandle; +/// Request type for uploads to Amazon S3 +pub use input::{UploadInput, UploadInputBuilder}; +/// Response type for uploads to Amazon S3 +pub use output::{UploadOutput, UploadOutputBuilder}; +use std::cmp; +use std::sync::Arc; +use tracing::Instrument; + +/// Maximum number of parts that a single S3 multipart upload supports +const MAX_PARTS: u64 = 10_000; + +/// Operation struct for single object upload +#[derive(Clone, Default, Debug)] +pub(crate) struct Upload; + +impl Upload { + /// Execute a single `Upload` transfer operation + pub(crate) async fn orchestrate( + handle: Arc, + mut input: crate::operation::upload::UploadInput, + ) -> Result { + let min_mpu_threshold = handle.mpu_threshold_bytes(); + + let stream = input.take_body(); + let ctx = new_context(handle, input); + let mut handle = UploadHandle::new(ctx); + + // MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway) + let content_length = stream + .size_hint() + .upper() + .ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?; + + if content_length < min_mpu_threshold { + // TODO - adapt body to ByteStream and send request using `PutObject` for non mpu upload + // tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request"); + try_start_mpu_upload(&mut handle, stream, content_length).await? + } else { + try_start_mpu_upload(&mut handle, stream, content_length).await? + } + + Ok(handle) + } +} + +/// Start a multipart upload +/// +/// # Arguments +/// +/// * `handle` - The upload handle +/// * `stream` - The content to upload +/// * `content_length` - The upper bound on the content length +async fn try_start_mpu_upload( + handle: &mut UploadHandle, + stream: InputStream, + content_length: u64, +) -> Result<(), UploadError> { + let part_size = cmp::max( + handle.ctx.handle.upload_part_size_bytes(), + content_length / MAX_PARTS, + ); + tracing::trace!("upload request using multipart upload with part size: {part_size} bytes"); + + let mpu = start_mpu(handle).await?; + tracing::trace!( + "multipart upload started with upload id: {:?}", + mpu.upload_id + ); + + handle.set_response(mpu); + + let part_reader = Arc::new( + PartReaderBuilder::new() + .stream(stream) + .part_size(part_size.try_into().expect("valid part size")) + .build(), + ); + + let n_workers = handle.ctx.handle.num_workers(); + for i in 0..n_workers { + let worker = upload_parts(handle.ctx.clone(), part_reader.clone()) + .instrument(tracing::debug_span!("upload-part", worker = i)); + handle.tasks.spawn(worker); + } + + Ok(()) +} + +fn new_context(handle: Arc, req: UploadInput) -> UploadContext { + UploadContext { + handle, + request: Arc::new(req), + upload_id: None, + } +} + +/// start a new multipart upload by invoking `CreateMultipartUpload` +async fn start_mpu(handle: &UploadHandle) -> Result { + let req = handle.ctx.request(); + let client = handle.ctx.client(); + + let resp = client + .create_multipart_upload() + .set_acl(req.acl.clone()) + .set_bucket(req.bucket.clone()) + .set_cache_control(req.cache_control.clone()) + .set_content_disposition(req.content_disposition.clone()) + .set_content_encoding(req.content_encoding.clone()) + .set_content_language(req.content_language.clone()) + .set_content_type(req.content_type.clone()) + .set_expires(req.expires) + .set_grant_full_control(req.grant_full_control.clone()) + .set_grant_read(req.grant_read.clone()) + .set_grant_read_acp(req.grant_read_acp.clone()) + .set_grant_write_acp(req.grant_write_acp.clone()) + .set_key(req.key.clone()) + .set_metadata(req.metadata.clone()) + .set_server_side_encryption(req.server_side_encryption.clone()) + .set_storage_class(req.storage_class.clone()) + .set_website_redirect_location(req.website_redirect_location.clone()) + .set_sse_customer_algorithm(req.sse_customer_algorithm.clone()) + .set_sse_customer_key(req.sse_customer_key.clone()) + .set_sse_customer_key_md5(req.sse_customer_key_md5.clone()) + .set_ssekms_key_id(req.sse_kms_key_id.clone()) + .set_ssekms_encryption_context(req.sse_kms_encryption_context.clone()) + .set_bucket_key_enabled(req.bucket_key_enabled) + .set_request_payer(req.request_payer.clone()) + .set_tagging(req.tagging.clone()) + .set_object_lock_mode(req.object_lock_mode.clone()) + .set_object_lock_retain_until_date(req.object_lock_retain_until_date) + .set_object_lock_legal_hold_status(req.object_lock_legal_hold_status.clone()) + .set_expected_bucket_owner(req.expected_bucket_owner.clone()) + .set_checksum_algorithm(req.checksum_algorithm.clone()) + .send() + .await?; + + Ok(resp.into()) +} + +/// Worker function that pulls part data off the `reader` and uploads each part until the reader +/// is exhausted. If any part fails the worker will return the error and stop processing. If +/// the worker finishes successfully the completed parts uploaded by this worker are returned. +async fn upload_parts( + ctx: UploadContext, + reader: Arc, +) -> Result, UploadError> { + let mut completed_parts = Vec::new(); + loop { + let part_result = reader.next_part().await?; + let part_data = match part_result { + Some(part_data) => part_data, + None => break, + }; + + let part_number = part_data.part_number as i32; + tracing::trace!("recv'd part number {}", part_number); + + let content_length = part_data.data.remaining(); + let body = ByteStream::from(part_data.data); + + // TODO(aws-sdk-rust#1159): disable payload signing + // TODO(aws-sdk-rust#1159): set checksum fields if applicable + let resp = ctx + .client() + .upload_part() + .set_bucket(ctx.request.bucket.clone()) + .set_key(ctx.request.key.clone()) + .set_upload_id(ctx.upload_id.clone()) + .part_number(part_number) + .content_length(content_length as i64) + .body(body) + .set_sse_customer_algorithm(ctx.request.sse_customer_algorithm.clone()) + .set_sse_customer_key(ctx.request.sse_customer_key.clone()) + .set_sse_customer_key_md5(ctx.request.sse_customer_key_md5.clone()) + .set_request_payer(ctx.request.request_payer.clone()) + .set_expected_bucket_owner(ctx.request.expected_bucket_owner.clone()) + .send() + .await?; + + tracing::trace!("completed upload of part number {}", part_number); + let completed = CompletedPart::builder() + .part_number(part_number) + .set_e_tag(resp.e_tag.clone()) + .set_checksum_crc32(resp.checksum_crc32.clone()) + .set_checksum_crc32_c(resp.checksum_crc32_c.clone()) + .set_checksum_sha1(resp.checksum_sha1.clone()) + .set_checksum_sha256(resp.checksum_sha256.clone()) + .build(); + + completed_parts.push(completed); + } + + tracing::trace!("no more parts, worker finished"); + Ok(completed_parts) +} + +#[cfg(test)] +mod test { + use crate::io::InputStream; + use crate::operation::upload::UploadInput; + use crate::types::{ConcurrencySetting, PartSize}; + use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput; + use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput; + use aws_sdk_s3::operation::upload_part::UploadPartOutput; + use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; + use bytes::Bytes; + use std::ops::Deref; + use std::sync::Arc; + + #[tokio::test] + async fn test_basic_mpu() { + let expected_upload_id = Arc::new("test-upload".to_owned()); + let body = Bytes::from_static(b"every adolescent dog goes bonkers early"); + let stream = InputStream::from(body); + + let upload_id = expected_upload_id.clone(); + let create_mpu = + mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || { + CreateMultipartUploadOutput::builder() + .upload_id(upload_id.as_ref().to_owned()) + .build() + }); + + let upload_id = expected_upload_id.clone(); + let upload_1 = mock!(aws_sdk_s3::Client::upload_part) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(30) + }) + .then_output(|| UploadPartOutput::builder().build()); + + let upload_id = expected_upload_id.clone(); + let upload_2 = mock!(aws_sdk_s3::Client::upload_part) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(9) + }) + .then_output(|| UploadPartOutput::builder().build()); + + let expected_e_tag = Arc::new("test-e-tag".to_owned()); + let upload_id = expected_upload_id.clone(); + let e_tag = expected_e_tag.clone(); + let complete_mpu = mock!(aws_sdk_s3::Client::complete_multipart_upload) + .match_requests(move |r| { + r.upload_id.as_ref() == Some(&upload_id) + && r.multipart_upload.clone().unwrap().parts.unwrap().len() == 2 + }) + .then_output(move || { + CompleteMultipartUploadOutput::builder() + .e_tag(e_tag.as_ref().to_owned()) + .build() + }); + + let client = mock_client!( + aws_sdk_s3, + RuleMode::Sequential, + &[&create_mpu, &upload_1, &upload_2, &complete_mpu] + ); + + let tm_config = crate::Config::builder() + .concurrency(ConcurrencySetting::Explicit(1)) + .set_multipart_threshold(PartSize::Target(10)) + .set_target_part_size(PartSize::Target(30)) + .client(client) + .build(); + + let tm = crate::Client::new(tm_config); + + let request = UploadInput::builder() + .bucket("test-bucket") + .key("test-key") + .body(stream); + + let handle = request.send_with(&tm).await.unwrap(); + + let resp = handle.join().await.unwrap(); + assert_eq!(expected_upload_id.deref(), resp.upload_id.unwrap().deref()); + assert_eq!(expected_e_tag.deref(), resp.e_tag.unwrap().deref()); + } +} diff --git a/aws-s3-transfer-manager/src/operation/upload/builders.rs b/aws-s3-transfer-manager/src/operation/upload/builders.rs new file mode 100644 index 0000000..6ba7e30 --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/upload/builders.rs @@ -0,0 +1,915 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use std::sync::Arc; + +use crate::{error::UploadError, types::FailedMultipartUploadPolicy}; + +use super::{UploadHandle, UploadInputBuilder}; + +/// Fluent builder for constructing a single object upload transfer +#[derive(Debug)] +pub struct UploadFluentBuilder { + handle: Arc, + inner: UploadInputBuilder, +} + +impl UploadFluentBuilder { + pub(crate) fn new(handle: Arc) -> Self { + Self { + handle, + inner: ::std::default::Default::default(), + } + } + + /// Initiate an upload transfer for a single object + pub async fn send(self) -> Result { + // FIXME - need UploadError to support this conversion to remove expect() in favor of ? + let input = self.inner.build().expect("valid input"); + crate::operation::upload::Upload::orchestrate(self.handle, input).await + } + + ///

The canned ACL to apply to the object. For more information, see Canned ACL in the Amazon S3 User Guide.

+ ///

When adding a new object, you can use headers to grant ACL-based permissions to individual Amazon Web Services accounts or to predefined groups defined by Amazon S3. These permissions are then added to the ACL on the object. By default, all objects are private. Only the owner has full access control. For more information, see Access Control List (ACL) Overview and Managing ACLs Using the REST API in the Amazon S3 User Guide.

+ ///

If the bucket that you're uploading objects to uses the bucket owner enforced setting for S3 Object Ownership, ACLs are disabled and no longer affect permissions. Buckets that use this setting only accept PUT requests that don't specify an ACL or PUT requests that specify bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. PUT requests that contain other ACLs (for example, custom grants to certain Amazon Web Services accounts) fail and return a 400 error with the error code AccessControlListNotSupported. For more information, see Controlling ownership of objects and disabling ACLs in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn acl(mut self, input: aws_sdk_s3::types::ObjectCannedAcl) -> Self { + self.inner = self.inner.acl(input); + self + } + ///

The canned ACL to apply to the object. For more information, see Canned ACL in the Amazon S3 User Guide.

+ ///

When adding a new object, you can use headers to grant ACL-based permissions to individual Amazon Web Services accounts or to predefined groups defined by Amazon S3. These permissions are then added to the ACL on the object. By default, all objects are private. Only the owner has full access control. For more information, see Access Control List (ACL) Overview and Managing ACLs Using the REST API in the Amazon S3 User Guide.

+ ///

If the bucket that you're uploading objects to uses the bucket owner enforced setting for S3 Object Ownership, ACLs are disabled and no longer affect permissions. Buckets that use this setting only accept PUT requests that don't specify an ACL or PUT requests that specify bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. PUT requests that contain other ACLs (for example, custom grants to certain Amazon Web Services accounts) fail and return a 400 error with the error code AccessControlListNotSupported. For more information, see Controlling ownership of objects and disabling ACLs in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn set_acl(mut self, input: Option) -> Self { + self.inner = self.inner.set_acl(input); + self + } + ///

The canned ACL to apply to the object. For more information, see Canned ACL in the Amazon S3 User Guide.

+ ///

When adding a new object, you can use headers to grant ACL-based permissions to individual Amazon Web Services accounts or to predefined groups defined by Amazon S3. These permissions are then added to the ACL on the object. By default, all objects are private. Only the owner has full access control. For more information, see Access Control List (ACL) Overview and Managing ACLs Using the REST API in the Amazon S3 User Guide.

+ ///

If the bucket that you're uploading objects to uses the bucket owner enforced setting for S3 Object Ownership, ACLs are disabled and no longer affect permissions. Buckets that use this setting only accept PUT requests that don't specify an ACL or PUT requests that specify bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. PUT requests that contain other ACLs (for example, custom grants to certain Amazon Web Services accounts) fail and return a 400 error with the error code AccessControlListNotSupported. For more information, see Controlling ownership of objects and disabling ACLs in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn get_acl(&self) -> &Option { + self.inner.get_acl() + } + ///

Object data.

+ pub fn body(mut self, input: crate::io::InputStream) -> Self { + self.inner = self.inner.body(input); + self + } + ///

Object data.

+ pub fn set_body(mut self, input: Option) -> Self { + self.inner = self.inner.set_body(input); + self + } + + ///

Object data.

+ pub fn get_body(&self) -> &Option { + self.inner.get_body() + } + + ///

The bucket name to which the PUT action was initiated.

+ ///

Directory buckets - When you use this operation with a directory bucket, you must use virtual-hosted-style requests in the format Bucket_name.s3express-az_id.region.amazonaws.com. Path-style requests are not supported. Directory bucket names must be unique in the chosen Availability Zone. Bucket names must follow the format bucket_base_name--az-id--x-s3 (for example, DOC-EXAMPLE-BUCKET--usw2-az1--x-s3). For information about bucket naming restrictions, see Directory bucket naming rules in the Amazon S3 User Guide.

+ ///

Access points - When you use this action with an access point, you must provide the alias of the access point in place of the bucket name or specify the access point ARN. When using the access point ARN, you must direct requests to the access point hostname. The access point hostname takes the form AccessPointName-AccountId.s3-accesspoint.Region.amazonaws.com. When using this action with an access point through the Amazon Web Services SDKs, you provide the access point ARN in place of the bucket name. For more information about access point ARNs, see Using access points in the Amazon S3 User Guide.

+ ///

Access points and Object Lambda access points are not supported by directory buckets.

+ ///
+ ///

S3 on Outposts - When you use this action with Amazon S3 on Outposts, you must direct requests to the S3 on Outposts hostname. The S3 on Outposts hostname takes the form AccessPointName-AccountId.outpostID.s3-outposts.Region.amazonaws.com. When you use this action with S3 on Outposts through the Amazon Web Services SDKs, you provide the Outposts access point ARN in place of the bucket name. For more information about S3 on Outposts ARNs, see What is S3 on Outposts? in the Amazon S3 User Guide.

+ /// This field is required. + pub fn bucket(mut self, input: impl Into) -> Self { + self.inner = self.inner.bucket(input.into()); + self + } + ///

The bucket name to which the PUT action was initiated.

+ ///

Directory buckets - When you use this operation with a directory bucket, you must use virtual-hosted-style requests in the format Bucket_name.s3express-az_id.region.amazonaws.com. Path-style requests are not supported. Directory bucket names must be unique in the chosen Availability Zone. Bucket names must follow the format bucket_base_name--az-id--x-s3 (for example, DOC-EXAMPLE-BUCKET--usw2-az1--x-s3). For information about bucket naming restrictions, see Directory bucket naming rules in the Amazon S3 User Guide.

+ ///

Access points - When you use this action with an access point, you must provide the alias of the access point in place of the bucket name or specify the access point ARN. When using the access point ARN, you must direct requests to the access point hostname. The access point hostname takes the form AccessPointName-AccountId.s3-accesspoint.Region.amazonaws.com. When using this action with an access point through the Amazon Web Services SDKs, you provide the access point ARN in place of the bucket name. For more information about access point ARNs, see Using access points in the Amazon S3 User Guide.

+ ///

Access points and Object Lambda access points are not supported by directory buckets.

+ ///
+ ///

S3 on Outposts - When you use this action with Amazon S3 on Outposts, you must direct requests to the S3 on Outposts hostname. The S3 on Outposts hostname takes the form AccessPointName-AccountId.outpostID.s3-outposts.Region.amazonaws.com. When you use this action with S3 on Outposts through the Amazon Web Services SDKs, you provide the Outposts access point ARN in place of the bucket name. For more information about S3 on Outposts ARNs, see What is S3 on Outposts? in the Amazon S3 User Guide.

+ pub fn set_bucket(mut self, input: Option) -> Self { + self.inner = self.inner.set_bucket(input); + self + } + ///

The bucket name to which the PUT action was initiated.

+ ///

Directory buckets - When you use this operation with a directory bucket, you must use virtual-hosted-style requests in the format Bucket_name.s3express-az_id.region.amazonaws.com. Path-style requests are not supported. Directory bucket names must be unique in the chosen Availability Zone. Bucket names must follow the format bucket_base_name--az-id--x-s3 (for example, DOC-EXAMPLE-BUCKET--usw2-az1--x-s3). For information about bucket naming restrictions, see Directory bucket naming rules in the Amazon S3 User Guide.

+ ///

Access points - When you use this action with an access point, you must provide the alias of the access point in place of the bucket name or specify the access point ARN. When using the access point ARN, you must direct requests to the access point hostname. The access point hostname takes the form AccessPointName-AccountId.s3-accesspoint.Region.amazonaws.com. When using this action with an access point through the Amazon Web Services SDKs, you provide the access point ARN in place of the bucket name. For more information about access point ARNs, see Using access points in the Amazon S3 User Guide.

+ ///

Access points and Object Lambda access points are not supported by directory buckets.

+ ///
+ ///

S3 on Outposts - When you use this action with Amazon S3 on Outposts, you must direct requests to the S3 on Outposts hostname. The S3 on Outposts hostname takes the form AccessPointName-AccountId.outpostID.s3-outposts.Region.amazonaws.com. When you use this action with S3 on Outposts through the Amazon Web Services SDKs, you provide the Outposts access point ARN in place of the bucket name. For more information about S3 on Outposts ARNs, see What is S3 on Outposts? in the Amazon S3 User Guide.

+ pub fn get_bucket(&self) -> &Option { + self.inner.get_bucket() + } + ///

Can be used to specify caching behavior along the request/reply chain. For more information, see http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9.

+ pub fn cache_control(mut self, input: impl Into) -> Self { + self.inner = self.inner.cache_control(input); + self + } + ///

Can be used to specify caching behavior along the request/reply chain. For more information, see http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9.

+ pub fn set_cache_control(mut self, input: Option) -> Self { + self.inner = self.inner.set_cache_control(input); + self + } + ///

Can be used to specify caching behavior along the request/reply chain. For more information, see http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9.

+ pub fn get_cache_control(&self) -> &Option { + self.inner.get_cache_control() + } + ///

Specifies presentational information for the object. For more information, see https://www.rfc-editor.org/rfc/rfc6266#section-4.

+ pub fn content_disposition(mut self, input: impl Into) -> Self { + self.inner = self.inner.content_disposition(input); + self + } + ///

Specifies presentational information for the object. For more information, see https://www.rfc-editor.org/rfc/rfc6266#section-4.

+ pub fn set_content_disposition(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_disposition(input); + self + } + ///

Specifies presentational information for the object. For more information, see https://www.rfc-editor.org/rfc/rfc6266#section-4.

+ pub fn get_content_disposition(&self) -> &Option { + self.inner.get_content_disposition() + } + ///

Specifies what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding.

+ pub fn content_encoding(mut self, input: impl Into) -> Self { + self.inner = self.inner.content_encoding(input); + self + } + ///

Specifies what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding.

+ pub fn set_content_encoding(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_encoding(input); + self + } + ///

Specifies what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding.

+ pub fn get_content_encoding(&self) -> &Option { + self.inner.get_content_encoding() + } + ///

The language the content is in.

+ pub fn content_language(mut self, input: impl Into) -> Self { + self.inner = self.inner.content_language(input); + self + } + ///

The language the content is in.

+ pub fn set_content_language(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_language(input); + self + } + ///

The language the content is in.

+ pub fn get_content_language(&self) -> &Option { + self.inner.get_content_language() + } + ///

Size of the body in bytes. This parameter is useful when the size of the body cannot be determined automatically. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length.

+ pub fn content_length(mut self, input: i64) -> Self { + self.inner = self.inner.content_length(input); + self + } + ///

Size of the body in bytes. This parameter is useful when the size of the body cannot be determined automatically. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length.

+ pub fn set_content_length(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_length(input); + self + } + ///

Size of the body in bytes. This parameter is useful when the size of the body cannot be determined automatically. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length.

+ pub fn get_content_length(&self) -> &Option { + self.inner.get_content_length() + } + ///

The base64-encoded 128-bit MD5 digest of the message (without the headers) according to RFC 1864. This header can be used as a message integrity check to verify that the data is the same data that was originally sent. Although it is optional, we recommend using the Content-MD5 mechanism as an end-to-end integrity check. For more information about REST request authentication, see REST Authentication.

+ ///

The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview in the Amazon S3 User Guide.

+ ///
+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn content_md5(mut self, input: impl Into) -> Self { + self.inner = self.inner.content_md5(input); + self + } + ///

The base64-encoded 128-bit MD5 digest of the message (without the headers) according to RFC 1864. This header can be used as a message integrity check to verify that the data is the same data that was originally sent. Although it is optional, we recommend using the Content-MD5 mechanism as an end-to-end integrity check. For more information about REST request authentication, see REST Authentication.

+ ///

The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview in the Amazon S3 User Guide.

+ ///
+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_content_md5(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_md5(input); + self + } + ///

The base64-encoded 128-bit MD5 digest of the message (without the headers) according to RFC 1864. This header can be used as a message integrity check to verify that the data is the same data that was originally sent. Although it is optional, we recommend using the Content-MD5 mechanism as an end-to-end integrity check. For more information about REST request authentication, see REST Authentication.

+ ///

The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview in the Amazon S3 User Guide.

+ ///
+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_content_md5(&self) -> &Option { + self.inner.get_content_md5() + } + ///

A standard MIME type describing the format of the contents. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type.

+ pub fn content_type(mut self, input: impl Into) -> Self { + self.inner = self.inner.content_type(input); + self + } + ///

A standard MIME type describing the format of the contents. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type.

+ pub fn set_content_type(mut self, input: Option) -> Self { + self.inner = self.inner.set_content_type(input); + self + } + ///

A standard MIME type describing the format of the contents. For more information, see https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type.

+ pub fn get_content_type(&self) -> &Option { + self.inner.get_content_type() + } + ///

Indicates the algorithm used to create the checksum for the object when you use the SDK. This header will not provide any additional functionality if you don't use the SDK. When you send this header, there must be a corresponding x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 fails the request with the HTTP status code 400 Bad Request.

+ ///

For the x-amz-checksum-algorithm header, replace algorithm with the supported algorithm from the following list:

+ ///
    + ///
  • + ///

    CRC32

  • + ///
  • + ///

    CRC32C

  • + ///
  • + ///

    SHA1

  • + ///
  • + ///

    SHA256

  • + ///
+ ///

For more information, see Checking object integrity in the Amazon S3 User Guide.

+ ///

If the individual checksum value you provide through x-amz-checksum-algorithm doesn't match the checksum algorithm you set through x-amz-sdk-checksum-algorithm, Amazon S3 ignores any provided ChecksumAlgorithm parameter and uses the checksum algorithm that matches the provided value in x-amz-checksum-algorithm .

+ ///

For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the default checksum algorithm that's used for performance.

+ ///
+ pub fn checksum_algorithm(mut self, input: aws_sdk_s3::types::ChecksumAlgorithm) -> Self { + self.inner = self.inner.checksum_algorithm(input); + self + } + ///

Indicates the algorithm used to create the checksum for the object when you use the SDK. This header will not provide any additional functionality if you don't use the SDK. When you send this header, there must be a corresponding x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 fails the request with the HTTP status code 400 Bad Request.

+ ///

For the x-amz-checksum-algorithm header, replace algorithm with the supported algorithm from the following list:

+ ///
    + ///
  • + ///

    CRC32

  • + ///
  • + ///

    CRC32C

  • + ///
  • + ///

    SHA1

  • + ///
  • + ///

    SHA256

  • + ///
+ ///

For more information, see Checking object integrity in the Amazon S3 User Guide.

+ ///

If the individual checksum value you provide through x-amz-checksum-algorithm doesn't match the checksum algorithm you set through x-amz-sdk-checksum-algorithm, Amazon S3 ignores any provided ChecksumAlgorithm parameter and uses the checksum algorithm that matches the provided value in x-amz-checksum-algorithm .

+ ///

For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the default checksum algorithm that's used for performance.

+ ///
+ pub fn set_checksum_algorithm( + mut self, + input: Option, + ) -> Self { + self.inner = self.inner.set_checksum_algorithm(input); + self + } + ///

Indicates the algorithm used to create the checksum for the object when you use the SDK. This header will not provide any additional functionality if you don't use the SDK. When you send this header, there must be a corresponding x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 fails the request with the HTTP status code 400 Bad Request.

+ ///

For the x-amz-checksum-algorithm header, replace algorithm with the supported algorithm from the following list:

+ ///
    + ///
  • + ///

    CRC32

  • + ///
  • + ///

    CRC32C

  • + ///
  • + ///

    SHA1

  • + ///
  • + ///

    SHA256

  • + ///
+ ///

For more information, see Checking object integrity in the Amazon S3 User Guide.

+ ///

If the individual checksum value you provide through x-amz-checksum-algorithm doesn't match the checksum algorithm you set through x-amz-sdk-checksum-algorithm, Amazon S3 ignores any provided ChecksumAlgorithm parameter and uses the checksum algorithm that matches the provided value in x-amz-checksum-algorithm .

+ ///

For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the default checksum algorithm that's used for performance.

+ ///
+ pub fn get_checksum_algorithm(&self) -> &Option { + self.inner.get_checksum_algorithm() + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32 checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn checksum_crc32(mut self, input: impl Into) -> Self { + self.inner = self.inner.checksum_crc32(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32 checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn set_checksum_crc32(mut self, input: Option) -> Self { + self.inner = self.inner.set_checksum_crc32(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32 checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn get_checksum_crc32(&self) -> &Option { + self.inner.get_checksum_crc32() + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32C checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn checksum_crc32_c(mut self, input: impl Into) -> Self { + self.inner = self.inner.checksum_crc32_c(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32C checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn set_checksum_crc32_c(mut self, input: Option) -> Self { + self.inner = self.inner.set_checksum_crc32_c(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 32-bit CRC32C checksum of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn get_checksum_crc32_c(&self) -> &Option { + self.inner.get_checksum_crc32_c() + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 160-bit SHA-1 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn checksum_sha1(mut self, input: impl Into) -> Self { + self.inner = self.inner.checksum_sha1(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 160-bit SHA-1 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn set_checksum_sha1(mut self, input: Option) -> Self { + self.inner = self.inner.set_checksum_sha1(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 160-bit SHA-1 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn get_checksum_sha1(&self) -> &Option { + self.inner.get_checksum_sha1() + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 256-bit SHA-256 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn checksum_sha256(mut self, input: impl Into) -> Self { + self.inner = self.inner.checksum_sha256(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 256-bit SHA-256 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn set_checksum_sha256(mut self, input: Option) -> Self { + self.inner = self.inner.set_checksum_sha256(input); + self + } + ///

This header can be used as a data integrity check to verify that the data received is the same data that was originally sent. This header specifies the base64-encoded, 256-bit SHA-256 digest of the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub fn get_checksum_sha256(&self) -> &Option { + self.inner.get_checksum_sha256() + } + ///

The date and time at which the object is no longer cacheable. For more information, see https://www.rfc-editor.org/rfc/rfc7234#section-5.3.

+ pub fn expires(mut self, input: ::aws_smithy_types::DateTime) -> Self { + self.inner = self.inner.expires(input); + self + } + ///

The date and time at which the object is no longer cacheable. For more information, see https://www.rfc-editor.org/rfc/rfc7234#section-5.3.

+ pub fn set_expires(mut self, input: Option<::aws_smithy_types::DateTime>) -> Self { + self.inner = self.inner.set_expires(input); + self + } + ///

The date and time at which the object is no longer cacheable. For more information, see https://www.rfc-editor.org/rfc/rfc7234#section-5.3.

+ pub fn get_expires(&self) -> &Option<::aws_smithy_types::DateTime> { + self.inner.get_expires() + } + ///

Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn grant_full_control(mut self, input: impl Into) -> Self { + self.inner = self.inner.grant_full_control(input); + self + } + ///

Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn set_grant_full_control(mut self, input: Option) -> Self { + self.inner = self.inner.set_grant_full_control(input); + self + } + ///

Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn get_grant_full_control(&self) -> &Option { + self.inner.get_grant_full_control() + } + ///

Allows grantee to read the object data and its metadata.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn grant_read(mut self, input: impl Into) -> Self { + self.inner = self.inner.grant_read(input); + self + } + ///

Allows grantee to read the object data and its metadata.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn set_grant_read(mut self, input: Option) -> Self { + self.inner = self.inner.set_grant_read(input); + self + } + ///

Allows grantee to read the object data and its metadata.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn get_grant_read(&self) -> &Option { + self.inner.get_grant_read() + } + ///

Allows grantee to read the object ACL.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn grant_read_acp(mut self, input: impl Into) -> Self { + self.inner = self.inner.grant_read_acp(input); + self + } + ///

Allows grantee to read the object ACL.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn set_grant_read_acp(mut self, input: Option) -> Self { + self.inner = self.inner.set_grant_read_acp(input); + self + } + ///

Allows grantee to read the object ACL.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn get_grant_read_acp(&self) -> &Option { + self.inner.get_grant_read_acp() + } + ///

Allows grantee to write the ACL for the applicable object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn grant_write_acp(mut self, input: impl Into) -> Self { + self.inner = self.inner.grant_write_acp(input); + self + } + ///

Allows grantee to write the ACL for the applicable object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn set_grant_write_acp(mut self, input: Option) -> Self { + self.inner = self.inner.set_grant_write_acp(input); + self + } + ///

Allows grantee to write the ACL for the applicable object.

+ ///
    + ///
  • + ///

    This functionality is not supported for directory buckets.

  • + ///
  • + ///

    This functionality is not supported for Amazon S3 on Outposts.

  • + ///
+ ///
+ pub fn get_grant_write_acp(&self) -> &Option { + self.inner.get_grant_write_acp() + } + ///

Object key for which the PUT action was initiated.

+ /// This field is required. + pub fn key(mut self, input: impl Into) -> Self { + self.inner = self.inner.key(input); + self + } + ///

Object key for which the PUT action was initiated.

+ pub fn set_key(mut self, input: Option) -> Self { + self.inner = self.inner.set_key(input); + self + } + ///

Object key for which the PUT action was initiated.

+ pub fn get_key(&self) -> &Option { + self.inner.get_key() + } + /// Adds a key-value pair to `metadata`. + /// + /// To override the contents of this collection use [`set_metadata`](Self::set_metadata). + /// + ///

A map of metadata to store with the object in S3.

+ pub fn metadata(mut self, k: impl Into, v: impl Into) -> Self { + self.inner = self.inner.metadata(k, v); + self + } + ///

A map of metadata to store with the object in S3.

+ pub fn set_metadata( + mut self, + input: Option<::std::collections::HashMap>, + ) -> Self { + self.inner = self.inner.set_metadata(input); + self + } + ///

A map of metadata to store with the object in S3.

+ pub fn get_metadata(&self) -> &Option<::std::collections::HashMap> { + self.inner.get_metadata() + } + ///

The server-side encryption algorithm that was used when you store this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse).

+ ///

General purpose buckets - You have four mutually exclusive options to protect data using server-side encryption in Amazon S3, depending on how you choose to manage the encryption keys. Specifically, the encryption key options are Amazon S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side encryption by using Amazon S3 managed keys (SSE-S3) by default. You can optionally tell Amazon S3 to encrypt data at rest by using server-side encryption with other key options. For more information, see Using Server-Side Encryption in the Amazon S3 User Guide.

+ ///

Directory buckets - For directory buckets, only the server-side encryption with Amazon S3 managed keys (SSE-S3) (AES256) value is supported.

+ pub fn server_side_encryption( + mut self, + input: aws_sdk_s3::types::ServerSideEncryption, + ) -> Self { + self.inner = self.inner.server_side_encryption(input); + self + } + ///

The server-side encryption algorithm that was used when you store this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse).

+ ///

General purpose buckets - You have four mutually exclusive options to protect data using server-side encryption in Amazon S3, depending on how you choose to manage the encryption keys. Specifically, the encryption key options are Amazon S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side encryption by using Amazon S3 managed keys (SSE-S3) by default. You can optionally tell Amazon S3 to encrypt data at rest by using server-side encryption with other key options. For more information, see Using Server-Side Encryption in the Amazon S3 User Guide.

+ ///

Directory buckets - For directory buckets, only the server-side encryption with Amazon S3 managed keys (SSE-S3) (AES256) value is supported.

+ pub fn set_server_side_encryption( + mut self, + input: Option, + ) -> Self { + self.inner = self.inner.set_server_side_encryption(input); + self + } + ///

The server-side encryption algorithm that was used when you store this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse).

+ ///

General purpose buckets - You have four mutually exclusive options to protect data using server-side encryption in Amazon S3, depending on how you choose to manage the encryption keys. Specifically, the encryption key options are Amazon S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side encryption by using Amazon S3 managed keys (SSE-S3) by default. You can optionally tell Amazon S3 to encrypt data at rest by using server-side encryption with other key options. For more information, see Using Server-Side Encryption in the Amazon S3 User Guide.

+ ///

Directory buckets - For directory buckets, only the server-side encryption with Amazon S3 managed keys (SSE-S3) (AES256) value is supported.

+ pub fn get_server_side_encryption(&self) -> &Option { + self.inner.get_server_side_encryption() + } + ///

By default, Amazon S3 uses the STANDARD Storage Class to store newly created objects. The STANDARD storage class provides high durability and high availability. Depending on performance needs, you can specify a different Storage Class. For more information, see Storage Classes in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    For directory buckets, only the S3 Express One Zone storage class is supported to store newly created objects.

  • + ///
  • + ///

    Amazon S3 on Outposts only uses the OUTPOSTS Storage Class.

  • + ///
+ ///
+ pub fn storage_class(mut self, input: aws_sdk_s3::types::StorageClass) -> Self { + self.inner = self.inner.storage_class(input); + self + } + ///

By default, Amazon S3 uses the STANDARD Storage Class to store newly created objects. The STANDARD storage class provides high durability and high availability. Depending on performance needs, you can specify a different Storage Class. For more information, see Storage Classes in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    For directory buckets, only the S3 Express One Zone storage class is supported to store newly created objects.

  • + ///
  • + ///

    Amazon S3 on Outposts only uses the OUTPOSTS Storage Class.

  • + ///
+ ///
+ pub fn set_storage_class(mut self, input: Option) -> Self { + self.inner = self.inner.set_storage_class(input); + self + } + ///

By default, Amazon S3 uses the STANDARD Storage Class to store newly created objects. The STANDARD storage class provides high durability and high availability. Depending on performance needs, you can specify a different Storage Class. For more information, see Storage Classes in the Amazon S3 User Guide.

+ ///
    + ///
  • + ///

    For directory buckets, only the S3 Express One Zone storage class is supported to store newly created objects.

  • + ///
  • + ///

    Amazon S3 on Outposts only uses the OUTPOSTS Storage Class.

  • + ///
+ ///
+ pub fn get_storage_class(&self) -> &Option { + self.inner.get_storage_class() + } + ///

If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata. For information about object metadata, see Object Key and Metadata in the Amazon S3 User Guide.

+ ///

In the following example, the request header sets the redirect to an object (anotherPage.html) in the same bucket:

+ ///

x-amz-website-redirect-location: /anotherPage.html

+ ///

In the following example, the request header sets the object redirect to another website:

+ ///

x-amz-website-redirect-location: http://www.example.com/

+ ///

For more information about website hosting in Amazon S3, see Hosting Websites on Amazon S3 and How to Configure Website Page Redirects in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn website_redirect_location(mut self, input: impl Into) -> Self { + self.inner = self.inner.website_redirect_location(input); + self + } + ///

If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata. For information about object metadata, see Object Key and Metadata in the Amazon S3 User Guide.

+ ///

In the following example, the request header sets the redirect to an object (anotherPage.html) in the same bucket:

+ ///

x-amz-website-redirect-location: /anotherPage.html

+ ///

In the following example, the request header sets the object redirect to another website:

+ ///

x-amz-website-redirect-location: http://www.example.com/

+ ///

For more information about website hosting in Amazon S3, see Hosting Websites on Amazon S3 and How to Configure Website Page Redirects in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_website_redirect_location(mut self, input: Option) -> Self { + self.inner = self.inner.set_website_redirect_location(input); + self + } + ///

If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata. For information about object metadata, see Object Key and Metadata in the Amazon S3 User Guide.

+ ///

In the following example, the request header sets the redirect to an object (anotherPage.html) in the same bucket:

+ ///

x-amz-website-redirect-location: /anotherPage.html

+ ///

In the following example, the request header sets the object redirect to another website:

+ ///

x-amz-website-redirect-location: http://www.example.com/

+ ///

For more information about website hosting in Amazon S3, see Hosting Websites on Amazon S3 and How to Configure Website Page Redirects in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_website_redirect_location(&self) -> &Option { + self.inner.get_website_redirect_location() + } + ///

Specifies the algorithm to use when encrypting the object (for example, AES256).

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn sse_customer_algorithm(mut self, input: impl Into) -> Self { + self.inner = self.inner.sse_customer_algorithm(input); + self + } + ///

Specifies the algorithm to use when encrypting the object (for example, AES256).

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_sse_customer_algorithm(mut self, input: Option) -> Self { + self.inner = self.inner.set_sse_customer_algorithm(input); + self + } + ///

Specifies the algorithm to use when encrypting the object (for example, AES256).

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_sse_customer_algorithm(&self) -> &Option { + self.inner.get_sse_customer_algorithm() + } + ///

Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data. This value is used to store the object and then it is discarded; Amazon S3 does not store the encryption key. The key must be appropriate for use with the algorithm specified in the x-amz-server-side-encryption-customer-algorithm header.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn sse_customer_key(mut self, input: impl Into) -> Self { + self.inner = self.inner.sse_customer_key(input); + self + } + ///

Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data. This value is used to store the object and then it is discarded; Amazon S3 does not store the encryption key. The key must be appropriate for use with the algorithm specified in the x-amz-server-side-encryption-customer-algorithm header.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_sse_customer_key(mut self, input: Option) -> Self { + self.inner = self.inner.set_sse_customer_key(input); + self + } + ///

Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data. This value is used to store the object and then it is discarded; Amazon S3 does not store the encryption key. The key must be appropriate for use with the algorithm specified in the x-amz-server-side-encryption-customer-algorithm header.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_sse_customer_key(&self) -> &Option { + self.inner.get_sse_customer_key() + } + ///

Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. Amazon S3 uses this header for a message integrity check to ensure that the encryption key was transmitted without error.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn sse_customer_key_md5(mut self, input: impl Into) -> Self { + self.inner = self.inner.sse_customer_key_md5(input); + self + } + ///

Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. Amazon S3 uses this header for a message integrity check to ensure that the encryption key was transmitted without error.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_sse_customer_key_md5(mut self, input: Option) -> Self { + self.inner = self.inner.set_sse_customer_key_md5(input); + self + } + ///

Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. Amazon S3 uses this header for a message integrity check to ensure that the encryption key was transmitted without error.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_sse_customer_key_md5(&self) -> &Option { + self.inner.get_sse_customer_key_md5() + } + ///

If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse, this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key Management Service (KMS) symmetric encryption customer managed key that was used for the object. If you specify x-amz-server-side-encryption:aws:kms or x-amz-server-side-encryption:aws:kms:dsse, but do not provide x-amz-server-side-encryption-aws-kms-key-id, Amazon S3 uses the Amazon Web Services managed key (aws/s3) to protect the data. If the KMS key does not exist in the same account that's issuing the command, you must use the full ARN and not just the ID.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn sse_kms_key_id(mut self, input: impl Into) -> Self { + self.inner = self.inner.sse_kms_key_id(input); + self + } + ///

If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse, this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key Management Service (KMS) symmetric encryption customer managed key that was used for the object. If you specify x-amz-server-side-encryption:aws:kms or x-amz-server-side-encryption:aws:kms:dsse, but do not provide x-amz-server-side-encryption-aws-kms-key-id, Amazon S3 uses the Amazon Web Services managed key (aws/s3) to protect the data. If the KMS key does not exist in the same account that's issuing the command, you must use the full ARN and not just the ID.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_sse_kms_key_id(mut self, input: Option) -> Self { + self.inner = self.inner.set_sse_kms_key_id(input); + self + } + ///

If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse, this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key Management Service (KMS) symmetric encryption customer managed key that was used for the object. If you specify x-amz-server-side-encryption:aws:kms or x-amz-server-side-encryption:aws:kms:dsse, but do not provide x-amz-server-side-encryption-aws-kms-key-id, Amazon S3 uses the Amazon Web Services managed key (aws/s3) to protect the data. If the KMS key does not exist in the same account that's issuing the command, you must use the full ARN and not just the ID.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_sse_kms_key_id(&self) -> &Option { + self.inner.get_sse_kms_key_id() + } + ///

Specifies the Amazon Web Services KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs. This value is stored as object metadata and automatically gets passed on to Amazon Web Services KMS for future GetObject or CopyObject operations on this object. This value must be explicitly added during CopyObject operations.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn sse_kms_encryption_context(mut self, input: impl Into) -> Self { + self.inner = self.inner.sse_kms_encryption_context(input); + self + } + ///

Specifies the Amazon Web Services KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs. This value is stored as object metadata and automatically gets passed on to Amazon Web Services KMS for future GetObject or CopyObject operations on this object. This value must be explicitly added during CopyObject operations.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_sse_kms_encryption_context(mut self, input: Option) -> Self { + self.inner = self.inner.set_sse_kms_encryption_context(input); + self + } + ///

Specifies the Amazon Web Services KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs. This value is stored as object metadata and automatically gets passed on to Amazon Web Services KMS for future GetObject or CopyObject operations on this object. This value must be explicitly added during CopyObject operations.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_sse_kms_encryption_context(&self) -> &Option { + self.inner.get_sse_kms_encryption_context() + } + ///

Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object encryption with SSE-KMS.

+ ///

Specifying this header with a PUT action doesn’t affect bucket-level settings for S3 Bucket Key.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn bucket_key_enabled(mut self, input: bool) -> Self { + self.inner = self.inner.bucket_key_enabled(input); + self + } + ///

Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object encryption with SSE-KMS.

+ ///

Specifying this header with a PUT action doesn’t affect bucket-level settings for S3 Bucket Key.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_bucket_key_enabled(mut self, input: Option) -> Self { + self.inner = self.inner.set_bucket_key_enabled(input); + self + } + ///

Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object encryption with SSE-KMS.

+ ///

Specifying this header with a PUT action doesn’t affect bucket-level settings for S3 Bucket Key.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_bucket_key_enabled(&self) -> &Option { + self.inner.get_bucket_key_enabled() + } + ///

Confirms that the requester knows that they will be charged for the request. Bucket owners need not specify this parameter in their requests. If either the source or destination S3 bucket has Requester Pays enabled, the requester will pay for corresponding charges to copy the object. For information about downloading objects from Requester Pays buckets, see Downloading Objects in Requester Pays Buckets in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn request_payer(mut self, input: aws_sdk_s3::types::RequestPayer) -> Self { + self.inner = self.inner.request_payer(input); + self + } + ///

Confirms that the requester knows that they will be charged for the request. Bucket owners need not specify this parameter in their requests. If either the source or destination S3 bucket has Requester Pays enabled, the requester will pay for corresponding charges to copy the object. For information about downloading objects from Requester Pays buckets, see Downloading Objects in Requester Pays Buckets in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_request_payer(mut self, input: Option) -> Self { + self.inner = self.inner.set_request_payer(input); + self + } + ///

Confirms that the requester knows that they will be charged for the request. Bucket owners need not specify this parameter in their requests. If either the source or destination S3 bucket has Requester Pays enabled, the requester will pay for corresponding charges to copy the object. For information about downloading objects from Requester Pays buckets, see Downloading Objects in Requester Pays Buckets in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_request_payer(&self) -> &Option { + self.inner.get_request_payer() + } + ///

The tag-set for the object. The tag-set must be encoded as URL Query parameters. (For example, "Key1=Value1")

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn tagging(mut self, input: impl Into) -> Self { + self.inner = self.inner.tagging(input); + self + } + ///

The tag-set for the object. The tag-set must be encoded as URL Query parameters. (For example, "Key1=Value1")

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_tagging(mut self, input: Option) -> Self { + self.inner = self.inner.set_tagging(input); + self + } + ///

The tag-set for the object. The tag-set must be encoded as URL Query parameters. (For example, "Key1=Value1")

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_tagging(&self) -> &Option { + self.inner.get_tagging() + } + ///

The Object Lock mode that you want to apply to this object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn object_lock_mode(mut self, input: aws_sdk_s3::types::ObjectLockMode) -> Self { + self.inner = self.inner.object_lock_mode(input); + self + } + ///

The Object Lock mode that you want to apply to this object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_object_lock_mode( + mut self, + input: Option, + ) -> Self { + self.inner = self.inner.set_object_lock_mode(input); + self + } + ///

The Object Lock mode that you want to apply to this object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_object_lock_mode(&self) -> &Option { + self.inner.get_object_lock_mode() + } + ///

The date and time when you want this object's Object Lock to expire. Must be formatted as a timestamp parameter.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn object_lock_retain_until_date(mut self, input: ::aws_smithy_types::DateTime) -> Self { + self.inner = self.inner.object_lock_retain_until_date(input); + self + } + ///

The date and time when you want this object's Object Lock to expire. Must be formatted as a timestamp parameter.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_object_lock_retain_until_date( + mut self, + input: Option<::aws_smithy_types::DateTime>, + ) -> Self { + self.inner = self.inner.set_object_lock_retain_until_date(input); + self + } + ///

The date and time when you want this object's Object Lock to expire. Must be formatted as a timestamp parameter.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_object_lock_retain_until_date(&self) -> &Option<::aws_smithy_types::DateTime> { + self.inner.get_object_lock_retain_until_date() + } + ///

Specifies whether a legal hold will be applied to this object. For more information about S3 Object Lock, see Object Lock in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn object_lock_legal_hold_status( + mut self, + input: aws_sdk_s3::types::ObjectLockLegalHoldStatus, + ) -> Self { + self.inner = self.inner.object_lock_legal_hold_status(input); + self + } + ///

Specifies whether a legal hold will be applied to this object. For more information about S3 Object Lock, see Object Lock in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn set_object_lock_legal_hold_status( + mut self, + input: Option, + ) -> Self { + self.inner = self.inner.set_object_lock_legal_hold_status(input); + self + } + ///

Specifies whether a legal hold will be applied to this object. For more information about S3 Object Lock, see Object Lock in the Amazon S3 User Guide.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub fn get_object_lock_legal_hold_status( + &self, + ) -> &Option { + self.inner.get_object_lock_legal_hold_status() + } + ///

The account ID of the expected bucket owner. If the account ID that you provide does not match the actual owner of the bucket, the request fails with the HTTP status code 403 Forbidden (access denied).

+ pub fn expected_bucket_owner(mut self, input: impl Into) -> Self { + self.inner = self.inner.expected_bucket_owner(input); + self + } + ///

The account ID of the expected bucket owner. If the account ID that you provide does not match the actual owner of the bucket, the request fails with the HTTP status code 403 Forbidden (access denied).

+ pub fn set_expected_bucket_owner(mut self, input: Option) -> Self { + self.inner = self.inner.set_expected_bucket_owner(input); + self + } + ///

The account ID of the expected bucket owner. If the account ID that you provide does not match the actual owner of the bucket, the request fails with the HTTP status code 403 Forbidden (access denied).

+ pub fn get_expected_bucket_owner(&self) -> &Option { + self.inner.get_expected_bucket_owner() + } + + /// The policy that describes how to handle a failed multipart upload. + pub fn failed_multipart_upload_policy(mut self, input: FailedMultipartUploadPolicy) -> Self { + self.inner = self.inner.failed_multipart_upload_policy(input); + self + } + + /// The policy that describes how to handle a failed multipart upload. + pub fn set_failed_multipart_upload_policy( + mut self, + input: Option, + ) -> Self { + self.inner = self.inner.set_failed_multipart_upload_policy(input); + self + } + + /// The policy that describes how to handle a failed multipart upload. + pub fn get_failed_multipart_upload_policy(&self) -> &Option { + self.inner.get_failed_multipart_upload_policy() + } +} + +impl crate::operation::upload::input::UploadInputBuilder { + /// Initiate an upload transfer for a single object with this input using the given client. + pub async fn send_with(self, client: &crate::Client) -> Result { + let mut fluent_builder = client.upload(); + fluent_builder.inner = self; + fluent_builder.send().await + } +} diff --git a/aws-s3-transfer-manager/src/upload/context.rs b/aws-s3-transfer-manager/src/operation/upload/context.rs similarity index 78% rename from aws-s3-transfer-manager/src/upload/context.rs rename to aws-s3-transfer-manager/src/operation/upload/context.rs index 6db8880..5fe5577 100644 --- a/aws-s3-transfer-manager/src/upload/context.rs +++ b/aws-s3-transfer-manager/src/operation/upload/context.rs @@ -3,29 +3,29 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::upload::UploadRequest; +use crate::operation::upload::UploadInput; use std::ops::Deref; use std::sync::Arc; -/// Internal context used to drive a single Upload request +/// Internal context used to drive a single Upload operation #[derive(Debug, Clone)] pub(crate) struct UploadContext { /// client used for SDK operations - pub(crate) client: aws_sdk_s3::Client, + pub(crate) handle: Arc, /// the multipart upload ID pub(crate) upload_id: Option, /// the original request (NOTE: the body will have been taken for processing, only the other fields remain) - pub(crate) request: Arc, + pub(crate) request: Arc, } impl UploadContext { /// The S3 client to use for SDK operations pub(crate) fn client(&self) -> &aws_sdk_s3::Client { - &self.client + self.handle.config.client() } /// The original request (sans the body as it will have been taken for processing) - pub(crate) fn request(&self) -> &UploadRequest { + pub(crate) fn request(&self) -> &UploadInput { self.request.deref() } diff --git a/aws-s3-transfer-manager/src/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs similarity index 82% rename from aws-s3-transfer-manager/src/upload/handle.rs rename to aws-s3-transfer-manager/src/operation/upload/handle.rs index 3c3b50f..1d9c583 100644 --- a/aws-s3-transfer-manager/src/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -4,10 +4,9 @@ */ use crate::error::UploadError; -use crate::upload::context::UploadContext; -use crate::upload::request::FailedMultipartUploadPolicy; -use crate::upload::response::UploadResponseBuilder; -use crate::upload::UploadResponse; +use crate::operation::upload::context::UploadContext; +use crate::operation::upload::{UploadOutput, UploadOutputBuilder}; +use crate::types::{AbortedUpload, FailedMultipartUploadPolicy}; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use tokio::task; @@ -20,7 +19,7 @@ pub struct UploadHandle { /// The context used to drive an upload to completion pub(crate) ctx: UploadContext, /// The response that will eventually be yielded to the caller. - response: Option, + response: Option, } impl UploadHandle { @@ -37,7 +36,7 @@ impl UploadHandle { /// /// This is usually after `CreateMultipartUpload` is initiated (or /// `PutObject` is invoked for uploads less than the required MPU threshold). - pub(crate) fn set_response(&mut self, builder: UploadResponseBuilder) { + pub(crate) fn set_response(&mut self, builder: UploadOutputBuilder) { if builder.upload_id.is_some() { let upload_id = builder.upload_id.clone().expect("upload ID present"); self.ctx.set_upload_id(upload_id); @@ -47,7 +46,7 @@ impl UploadHandle { } /// Consume the handle and wait for upload to complete - pub async fn join(self) -> Result { + pub async fn join(self) -> Result { complete_upload(self).await } @@ -79,34 +78,10 @@ impl UploadHandle { } } -/// Describes the result of aborting an in-progress upload. -#[derive(Debug, Default)] -pub struct AbortedUpload { - upload_id: Option, - request_charged: Option, -} - -impl AbortedUpload { - /// Get the multipart upload ID that was cancelled - /// - /// Not present for uploads that did not utilize a multipart upload - pub fn upload_id(&self) -> &Option { - &self.upload_id - } - - /// If present, indicates that the requester was successfully charged for the request. - /// - /// This functionality is not supported for directory buckets and is - /// not present for uploads that did not utilize a multipart upload - pub fn request_charged(&self) -> &Option { - &self.request_charged - } -} - async fn abort_upload(handle: &UploadHandle) -> Result { let abort_mpu_resp = handle .ctx - .client + .client() .abort_multipart_upload() .set_bucket(handle.ctx.request.bucket.clone()) .set_key(handle.ctx.request.key.clone()) @@ -124,7 +99,7 @@ async fn abort_upload(handle: &UploadHandle) -> Result Result { +async fn complete_upload(mut handle: UploadHandle) -> Result { if !handle.ctx.is_multipart_upload() { todo!("non mpu upload not implemented yet") } @@ -161,7 +136,7 @@ async fn complete_upload(mut handle: UploadHandle) -> ResultThe canned ACL to apply to the object. For more information, see Canned ACL in the Amazon S3 User Guide.

///

When adding a new object, you can use headers to grant ACL-based permissions to individual Amazon Web Services accounts or to predefined groups defined by Amazon S3. These permissions are then added to the ACL on the object. By default, all objects are private. Only the owner has full access control. For more information, see Access Control List (ACL) Overview and Managing ACLs Using the REST API in the Amazon S3 User Guide.

///

If the bucket that you're uploading objects to uses the bucket owner enforced setting for S3 Object Ownership, ACLs are disabled and no longer affect permissions. Buckets that use this setting only accept PUT requests that don't specify an ACL or PUT requests that specify bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. PUT requests that contain other ACLs (for example, custom grants to certain Amazon Web Services accounts) fail and return a 400 error with the error code AccessControlListNotSupported. For more information, see Controlling ownership of objects and disabling ACLs in the Amazon S3 User Guide.

@@ -189,10 +191,10 @@ pub struct UploadRequest { pub failed_multipart_upload_policy: Option, } -impl UploadRequest { - /// Create a new builder for `UploadRequest` - pub fn builder() -> UploadRequestBuilder { - UploadRequestBuilder::default() +impl UploadInput { + /// Create a new builder for `UploadInput` + pub fn builder() -> UploadInputBuilder { + UploadInputBuilder::default() } /// Split the body from the request by taking it and replacing it with the default. @@ -457,9 +459,9 @@ impl UploadRequest { } } -impl Debug for UploadRequest { +impl Debug for UploadInput { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - let mut formatter = f.debug_struct("UploadRequest"); + let mut formatter = f.debug_struct("UploadInput"); formatter.field("acl", &self.acl); formatter.field("body", &self.body); formatter.field("bucket", &self.bucket); @@ -517,10 +519,10 @@ impl Debug for UploadRequest { } } -/// A builder for [`UploadRequest`]. +/// A builder for [`UploadInput`]. #[non_exhaustive] #[derive(Default)] -pub struct UploadRequestBuilder { +pub struct UploadInputBuilder { pub(crate) acl: Option, pub(crate) body: Option, pub(crate) bucket: Option, @@ -561,7 +563,7 @@ pub struct UploadRequestBuilder { pub(crate) failed_multipart_upload_policy: Option, } -impl UploadRequestBuilder { +impl UploadInputBuilder { ///

The canned ACL to apply to the object. For more information, see Canned ACL in the Amazon S3 User Guide.

///

When adding a new object, you can use headers to grant ACL-based permissions to individual Amazon Web Services accounts or to predefined groups defined by Amazon S3. These permissions are then added to the ACL on the object. By default, all objects are private. Only the owner has full access control. For more information, see Access Control List (ACL) Overview and Managing ACLs Using the REST API in the Amazon S3 User Guide.

///

If the bucket that you're uploading objects to uses the bucket owner enforced setting for S3 Object Ownership, ACLs are disabled and no longer affect permissions. Buckets that use this setting only accept PUT requests that don't specify an ACL or PUT requests that specify bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. PUT requests that contain other ACLs (for example, custom grants to certain Amazon Web Services accounts) fail and return a 400 error with the error code AccessControlListNotSupported. For more information, see Controlling ownership of objects and disabling ACLs in the Amazon S3 User Guide.

@@ -1437,10 +1439,10 @@ impl UploadRequestBuilder { &self.failed_multipart_upload_policy } - /// Consumes the builder and constructs a [`UploadRequest`] + /// Consumes the builder and constructs a [`UploadInput`] // FIXME(aws-sdk-rust#1159): replace BuildError with our own type? - pub fn build(self) -> Result { - Ok(UploadRequest { + pub fn build(self) -> Result { + Ok(UploadInput { body: self.body.unwrap_or_default(), acl: self.acl, bucket: self.bucket, @@ -1483,9 +1485,9 @@ impl UploadRequestBuilder { } } -impl Debug for UploadRequestBuilder { +impl Debug for UploadInputBuilder { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - let mut formatter = f.debug_struct("UploadRequestBuilder"); + let mut formatter = f.debug_struct("UploadInputBuilder"); formatter.field("acl", &self.acl); formatter.field("body", &self.body); formatter.field("bucket", &self.bucket); @@ -1539,15 +1541,3 @@ impl Debug for UploadRequestBuilder { formatter.finish() } } - -/// Policy for how to handle a failed multipart upload -/// -/// Default is to abort the upload. -#[derive(Debug, Clone, Default)] -pub enum FailedMultipartUploadPolicy { - /// Abort the upload on any individual part failure - #[default] - AbortUpload, - /// Retain any uploaded parts. The upload ID will be available in the response. - Retain, -} diff --git a/aws-s3-transfer-manager/src/upload/response.rs b/aws-s3-transfer-manager/src/operation/upload/output.rs similarity index 98% rename from aws-s3-transfer-manager/src/upload/response.rs rename to aws-s3-transfer-manager/src/operation/upload/output.rs index 904f89c..5532678 100644 --- a/aws-s3-transfer-manager/src/upload/response.rs +++ b/aws-s3-transfer-manager/src/operation/upload/output.rs @@ -9,7 +9,7 @@ use std::fmt::{Debug, Formatter}; /// Common response fields for uploading an object to Amazon S3 #[non_exhaustive] #[derive(Clone, PartialEq)] -pub struct UploadResponse { +pub struct UploadOutput { ///

If the expiration is configured for the object (see PutBucketLifecycleConfiguration) in the Amazon S3 User Guide, the response includes this header. It includes the expiry-date and rule-id key-value pairs that provide information about object expiration. The value of the rule-id is URL-encoded.

///

This functionality is not supported for directory buckets.

///
@@ -78,7 +78,7 @@ pub struct UploadResponse { pub upload_id: Option, } -impl UploadResponse { +impl UploadOutput { ///

If the expiration is configured for the object (see PutBucketLifecycleConfiguration) in the Amazon S3 User Guide, the response includes this header. It includes the expiry-date and rule-id key-value pairs that provide information about object expiration. The value of the rule-id is URL-encoded.

///

This functionality is not supported for directory buckets.

///
@@ -164,9 +164,9 @@ impl UploadResponse { } } -impl Debug for UploadResponse { +impl Debug for UploadOutput { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut formatter = f.debug_struct("UploadResponse"); + let mut formatter = f.debug_struct("UploadOutput"); formatter.field("expiration", &self.expiration); formatter.field("e_tag", &self.e_tag); formatter.field("checksum_crc32", &self.checksum_crc32); @@ -189,10 +189,10 @@ impl Debug for UploadResponse { } } -/// A builder for [`UploadResponse`]. +/// A builder for [`UploadOutput`]. #[non_exhaustive] #[derive(Default)] -pub struct UploadResponseBuilder { +pub struct UploadOutputBuilder { pub(crate) expiration: Option, pub(crate) e_tag: Option, pub(crate) checksum_crc32: Option, @@ -210,7 +210,7 @@ pub struct UploadResponseBuilder { pub(crate) upload_id: Option, } -impl UploadResponseBuilder { +impl UploadOutputBuilder { ///

If the expiration is configured for the object (see PutBucketLifecycleConfiguration) in the Amazon S3 User Guide, the response includes this header. It includes the expiry-date and rule-id key-value pairs that provide information about object expiration. The value of the rule-id is URL-encoded.

///

This functionality is not supported for directory buckets.

///
@@ -492,10 +492,10 @@ impl UploadResponseBuilder { &self.upload_id } - /// Consumes the builder and constructs a [`UploadResponse`] + /// Consumes the builder and constructs a [`UploadOutput`] // FIXME(aws-sdk-rust#1159): replace BuildError with our own type? - pub fn build(self) -> Result { - Ok(UploadResponse { + pub fn build(self) -> Result { + Ok(UploadOutput { expiration: self.expiration, e_tag: self.e_tag, checksum_crc32: self.checksum_crc32, @@ -515,9 +515,9 @@ impl UploadResponseBuilder { } } -impl Debug for UploadResponseBuilder { +impl Debug for UploadOutputBuilder { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - let mut formatter = f.debug_struct("UploadResponseBuilder"); + let mut formatter = f.debug_struct("UploadOutputBuilder"); formatter.field("expiration", &self.expiration); formatter.field("e_tag", &self.e_tag); formatter.field("checksum_crc32", &self.checksum_crc32); @@ -540,9 +540,9 @@ impl Debug for UploadResponseBuilder { } } -impl From for UploadResponseBuilder { +impl From for UploadOutputBuilder { fn from(value: CreateMultipartUploadOutput) -> Self { - UploadResponseBuilder { + UploadOutputBuilder { upload_id: value.upload_id, server_side_encryption: value.server_side_encryption, sse_customer_algorithm: value.sse_customer_algorithm, diff --git a/aws-s3-transfer-manager/src/types.rs b/aws-s3-transfer-manager/src/types.rs index 266d8d8..da09839 100644 --- a/aws-s3-transfer-manager/src/types.rs +++ b/aws-s3-transfer-manager/src/types.rs @@ -27,3 +27,39 @@ pub enum ConcurrencySetting { /// Explicitly configured concurrency setting. Explicit(usize), } + +/// Policy for how to handle a failed multipart upload +/// +/// Default is to abort the upload. +#[derive(Debug, Clone, Default)] +pub enum FailedMultipartUploadPolicy { + /// Abort the upload on any individual part failure + #[default] + AbortUpload, + /// Retain any uploaded parts. The upload ID will be available in the response. + Retain, +} + +/// Describes the result of aborting an in-progress upload. +#[derive(Debug, Default)] +pub struct AbortedUpload { + pub(crate) upload_id: Option, + pub(crate) request_charged: Option, +} + +impl AbortedUpload { + /// Get the multipart upload ID that was cancelled + /// + /// Not present for uploads that did not utilize a multipart upload + pub fn upload_id(&self) -> &Option { + &self.upload_id + } + + /// If present, indicates that the requester was successfully charged for the request. + /// + /// This functionality is not supported for directory buckets and is + /// not present for uploads that did not utilize a multipart upload + pub fn request_charged(&self) -> &Option { + &self.request_charged + } +} diff --git a/aws-s3-transfer-manager/src/upload.rs b/aws-s3-transfer-manager/src/upload.rs deleted file mode 100644 index e22fee1..0000000 --- a/aws-s3-transfer-manager/src/upload.rs +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -pub use crate::upload::request::UploadRequest; -pub use crate::upload::response::UploadResponse; - -use crate::error::UploadError; -use crate::io::part_reader::{Builder as PartReaderBuilder, ReadPart}; -use crate::io::InputStream; -use crate::types::{ConcurrencySetting, PartSize}; -use crate::upload::context::UploadContext; -use crate::upload::handle::UploadHandle; -use crate::upload::response::UploadResponseBuilder; -use crate::{DEFAULT_CONCURRENCY, MEBIBYTE}; -use aws_sdk_s3::types::CompletedPart; -use aws_smithy_types::byte_stream::ByteStream; -use aws_types::SdkConfig; -use bytes::Buf; -use std::cmp; -use std::sync::Arc; -use tracing::Instrument; - -mod handle; - -/// Request types for uploads to Amazon S3 -pub mod request; - -mod context; -/// Response types for uploads to Amazon S3 -pub mod response; - -/// Minimum upload part size in bytes -const MIN_PART_SIZE_BYTES: u64 = 5 * MEBIBYTE; - -/// Maximum number of parts that a single S3 multipart upload supports -const MAX_PARTS: u64 = 10_000; - -/// Fluent style builder for [Uploader] -#[derive(Debug, Clone, Default)] -pub struct Builder { - multipart_threshold_part_size: PartSize, - target_part_size: PartSize, - concurrency: ConcurrencySetting, - sdk_config: Option, - client: Option, -} - -impl Builder { - fn new() -> Builder { - Builder::default() - } - - /// Minimum object size that should trigger a multipart upload. - /// - /// The minimum part size is 5 MiB, any part size less than that will be rounded up. - /// Default is [PartSize::Auto] - pub fn multipart_threshold(self, threshold: PartSize) -> Self { - let threshold = match threshold { - PartSize::Target(part_size) => { - PartSize::Target(cmp::max(part_size, MIN_PART_SIZE_BYTES)) - } - tps => tps, - }; - - self.set_multipart_threshold(threshold) - } - - /// The target size of each part when using a multipart upload to complete the request. - /// - /// When a request's content length is les than [`multipart_threshold_part_size`], - /// this setting is ignored and a single [`PutObject`] request will be made instead. - /// - /// NOTE: The actual part size used may be larger than the configured part size if - /// the current value would result in more than 10,000 parts for an upload request. - /// - /// Default is [PartSize::Auto] - /// - /// [`multipart_threshold_part_size`]: method@Self::multipart_threshold_part_size - /// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html - pub fn part_size(self, part_size: PartSize) -> Self { - let threshold = match part_size { - PartSize::Target(part_size) => { - PartSize::Target(cmp::max(part_size, MIN_PART_SIZE_BYTES)) - } - tps => tps, - }; - - self.set_target_part_size(threshold) - } - - /// Minimum object size that should trigger a multipart upload. - /// - /// NOTE: This does not validate the setting and is meant for internal use only. - pub(crate) fn set_multipart_threshold(mut self, threshold: PartSize) -> Self { - self.multipart_threshold_part_size = threshold; - self - } - - /// Target part size for a multipart upload. - /// - /// NOTE: This does not validate the setting and is meant for internal use only. - pub(crate) fn set_target_part_size(mut self, threshold: PartSize) -> Self { - self.target_part_size = threshold; - self - } - - /// Set the configuration used by the S3 client - pub fn sdk_config(mut self, config: SdkConfig) -> Self { - self.sdk_config = Some(config); - self - } - - /// Set the concurrency level this component is allowed to use. - /// - /// This sets the maximum number of concurrent in-flight requests. - /// Default is [ConcurrencySetting::Auto]. - pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self { - self.concurrency = concurrency; - self - } - - /// Consumes the builder and constructs a [Uploader] - pub fn build(self) -> Uploader { - self.into() - } - - /// Set an explicit client to use. This takes precedence over setting `sdk_config`. - #[cfg(test)] - pub(crate) fn client(mut self, client: aws_sdk_s3::Client) -> Self { - self.client = Some(client); - self - } -} - -impl From for Uploader { - fn from(value: Builder) -> Self { - let client = value.client.unwrap_or_else(|| { - let sdk_config = value - .sdk_config - .unwrap_or_else(|| SdkConfig::builder().build()); - aws_sdk_s3::Client::new(&sdk_config) - }); - - Self { - multipart_threshold_part_size: value.multipart_threshold_part_size, - target_part_size: value.target_part_size, - concurrency: value.concurrency, - client, - } - } -} - -/// Upload an object in the most efficient way possible by splitting the request into -/// concurrent requests (e.g. using multi-part uploads). -#[derive(Debug, Clone)] -pub struct Uploader { - multipart_threshold_part_size: PartSize, - target_part_size: PartSize, - concurrency: ConcurrencySetting, - client: aws_sdk_s3::client::Client, -} - -impl Uploader { - /// Create a new [Builder] - pub fn builder() -> Builder { - Builder::new() - } - - /// Upload a single object to Amazon S3. - /// - /// A single logical request may be split into many concurrent `UploadPart` requests - /// to improve throughput. - /// - /// # Examples - /// - /// ```no_run - /// use std::error::Error; - /// use std::path::Path; - /// use aws_s3_transfer_manager::io::InputStream; - /// use aws_s3_transfer_manager::upload::{Uploader, UploadRequest}; - /// - /// async fn upload_file(client: Uploader, path: impl AsRef) -> Result<(), Box> { - /// let stream = InputStream::from_path(path)?; - /// let request = UploadRequest::builder() - /// .bucket("my-bucket") - /// .key("my-key") - /// .body(stream) - /// .build()?; - /// - /// let handle = client.upload(request).await?; - /// - /// // upload() will return potentially before the request is complete. - /// // The handle given must be joined to drive the request to completion. - /// // It can also be used to get progress, pause or cancel the request, etc. - /// let response = handle.join().await?; - /// // ... do something with response - /// Ok(()) - /// } - /// ``` - pub async fn upload(&self, mut req: UploadRequest) -> Result { - let min_mpu_threshold = self.mpu_threshold_bytes(); - - let stream = req.take_body(); - let ctx = self.new_context(req); - let mut handle = UploadHandle::new(ctx); - - // MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway) - let content_length = stream - .size_hint() - .upper() - .ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?; - - if content_length < min_mpu_threshold { - // TODO - adapt body to ByteStream and send request using `PutObject` for non mpu upload - // tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request"); - self.try_start_mpu_upload(&mut handle, stream, content_length) - .await? - } else { - self.try_start_mpu_upload(&mut handle, stream, content_length) - .await? - } - - Ok(handle) - } - - fn new_context(&self, req: UploadRequest) -> UploadContext { - UploadContext { - client: self.client.clone(), - request: Arc::new(req), - upload_id: None, - } - } - - /// Upload the object via multipart upload - /// - /// # Arguments - /// - /// * `handle` - The upload handle - /// * `stream` - The content to upload - /// * `content_length` - The upper bound on the content length - async fn try_start_mpu_upload( - &self, - handle: &mut UploadHandle, - stream: InputStream, - content_length: u64, - ) -> Result<(), UploadError> { - let part_size = cmp::max(self.part_size_bytes(), content_length / MAX_PARTS); - tracing::trace!("upload request using multipart upload with part size: {part_size} bytes"); - - let mpu = start_mpu(handle).await?; - tracing::trace!( - "multipart upload started with upload id: {:?}", - mpu.upload_id - ); - - handle.set_response(mpu); - - let part_reader = Arc::new( - PartReaderBuilder::new() - .stream(stream) - .part_size(part_size.try_into().expect("valid part size")) - .build(), - ); - - let n_workers = self.num_workers(); - for i in 0..n_workers { - let worker = upload_parts(handle.ctx.clone(), part_reader.clone()) - .instrument(tracing::debug_span!("upload-part", worker = i)); - handle.tasks.spawn(worker); - } - - Ok(()) - } - - /// Get the concrete number of workers to use based on the concurrency setting. - fn num_workers(&self) -> usize { - match self.concurrency { - // TODO(aws-sdk-rust#1159): add logic for determining this - ConcurrencySetting::Auto => DEFAULT_CONCURRENCY, - ConcurrencySetting::Explicit(explicit) => explicit, - } - } - - /// Get the concrete minimum upload size in bytes to use to determine whether multipart uploads - /// are enabled for a given request. - fn mpu_threshold_bytes(&self) -> u64 { - match self.multipart_threshold_part_size { - // TODO(aws-sdk-rust#1159): add logic for determining this - PartSize::Auto => 16 * MEBIBYTE, - PartSize::Target(explicit) => explicit, - } - } - - fn part_size_bytes(&self) -> u64 { - match self.target_part_size { - PartSize::Auto => 8 * MEBIBYTE, - PartSize::Target(explicit) => explicit, - } - } -} - -/// start a new multipart upload by invoking `CreateMultipartUpload` -async fn start_mpu(handle: &UploadHandle) -> Result { - let req = handle.ctx.request(); - let client = handle.ctx.client(); - - let resp = client - .create_multipart_upload() - .set_acl(req.acl.clone()) - .set_bucket(req.bucket.clone()) - .set_cache_control(req.cache_control.clone()) - .set_content_disposition(req.content_disposition.clone()) - .set_content_encoding(req.content_encoding.clone()) - .set_content_language(req.content_language.clone()) - .set_content_type(req.content_type.clone()) - .set_expires(req.expires) - .set_grant_full_control(req.grant_full_control.clone()) - .set_grant_read(req.grant_read.clone()) - .set_grant_read_acp(req.grant_read_acp.clone()) - .set_grant_write_acp(req.grant_write_acp.clone()) - .set_key(req.key.clone()) - .set_metadata(req.metadata.clone()) - .set_server_side_encryption(req.server_side_encryption.clone()) - .set_storage_class(req.storage_class.clone()) - .set_website_redirect_location(req.website_redirect_location.clone()) - .set_sse_customer_algorithm(req.sse_customer_algorithm.clone()) - .set_sse_customer_key(req.sse_customer_key.clone()) - .set_sse_customer_key_md5(req.sse_customer_key_md5.clone()) - .set_ssekms_key_id(req.sse_kms_key_id.clone()) - .set_ssekms_encryption_context(req.sse_kms_encryption_context.clone()) - .set_bucket_key_enabled(req.bucket_key_enabled) - .set_request_payer(req.request_payer.clone()) - .set_tagging(req.tagging.clone()) - .set_object_lock_mode(req.object_lock_mode.clone()) - .set_object_lock_retain_until_date(req.object_lock_retain_until_date) - .set_object_lock_legal_hold_status(req.object_lock_legal_hold_status.clone()) - .set_expected_bucket_owner(req.expected_bucket_owner.clone()) - .set_checksum_algorithm(req.checksum_algorithm.clone()) - .send() - .await?; - - Ok(resp.into()) -} - -/// Worker function that pulls part data off the `reader` and uploads each part until the reader -/// is exhausted. If any part fails the worker will return the error and stop processing. If -/// the worker finishes successfully the completed parts uploaded by this worker are returned. -async fn upload_parts( - ctx: UploadContext, - reader: Arc, -) -> Result, UploadError> { - let mut completed_parts = Vec::new(); - loop { - let part_result = reader.next_part().await?; - let part_data = match part_result { - Some(part_data) => part_data, - None => break, - }; - - let part_number = part_data.part_number as i32; - tracing::trace!("recv'd part number {}", part_number); - - let content_length = part_data.data.remaining(); - let body = ByteStream::from(part_data.data); - - // TODO(aws-sdk-rust#1159): disable payload signing - // TODO(aws-sdk-rust#1159): set checksum fields if applicable - let resp = ctx - .client - .upload_part() - .set_bucket(ctx.request.bucket.clone()) - .set_key(ctx.request.key.clone()) - .set_upload_id(ctx.upload_id.clone()) - .part_number(part_number) - .content_length(content_length as i64) - .body(body) - .set_sse_customer_algorithm(ctx.request.sse_customer_algorithm.clone()) - .set_sse_customer_key(ctx.request.sse_customer_key.clone()) - .set_sse_customer_key_md5(ctx.request.sse_customer_key_md5.clone()) - .set_request_payer(ctx.request.request_payer.clone()) - .set_expected_bucket_owner(ctx.request.expected_bucket_owner.clone()) - .send() - .await?; - - tracing::trace!("completed upload of part number {}", part_number); - let completed = CompletedPart::builder() - .part_number(part_number) - .set_e_tag(resp.e_tag.clone()) - .set_checksum_crc32(resp.checksum_crc32.clone()) - .set_checksum_crc32_c(resp.checksum_crc32_c.clone()) - .set_checksum_sha1(resp.checksum_sha1.clone()) - .set_checksum_sha256(resp.checksum_sha256.clone()) - .build(); - - completed_parts.push(completed); - } - - tracing::trace!("no more parts, worker finished"); - Ok(completed_parts) -} - -#[cfg(test)] -mod test { - use crate::io::InputStream; - use crate::types::{ConcurrencySetting, PartSize}; - use crate::upload::{UploadRequest, Uploader}; - use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput; - use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput; - use aws_sdk_s3::operation::upload_part::UploadPartOutput; - use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; - use bytes::Bytes; - use std::ops::Deref; - use std::sync::Arc; - - #[tokio::test] - async fn test_basic_mpu() { - let expected_upload_id = Arc::new("test-upload".to_owned()); - let body = Bytes::from_static(b"every adolescent dog goes bonkers early"); - let stream = InputStream::from(body); - - let upload_id = expected_upload_id.clone(); - let create_mpu = - mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || { - CreateMultipartUploadOutput::builder() - .upload_id(upload_id.as_ref().to_owned()) - .build() - }); - - let upload_id = expected_upload_id.clone(); - let upload_1 = mock!(aws_sdk_s3::Client::upload_part) - .match_requests(move |r| { - r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(30) - }) - .then_output(|| UploadPartOutput::builder().build()); - - let upload_id = expected_upload_id.clone(); - let upload_2 = mock!(aws_sdk_s3::Client::upload_part) - .match_requests(move |r| { - r.upload_id.as_ref() == Some(&upload_id) && r.content_length == Some(9) - }) - .then_output(|| UploadPartOutput::builder().build()); - - let expected_e_tag = Arc::new("test-e-tag".to_owned()); - let upload_id = expected_upload_id.clone(); - let e_tag = expected_e_tag.clone(); - let complete_mpu = mock!(aws_sdk_s3::Client::complete_multipart_upload) - .match_requests(move |r| { - r.upload_id.as_ref() == Some(&upload_id) - && r.multipart_upload.clone().unwrap().parts.unwrap().len() == 2 - }) - .then_output(move || { - CompleteMultipartUploadOutput::builder() - .e_tag(e_tag.as_ref().to_owned()) - .build() - }); - - let client = mock_client!( - aws_sdk_s3, - RuleMode::Sequential, - &[&create_mpu, &upload_1, &upload_2, &complete_mpu] - ); - - let uploader = Uploader::builder() - .concurrency(ConcurrencySetting::Explicit(1)) - .set_multipart_threshold(PartSize::Target(10)) - .set_target_part_size(PartSize::Target(30)) - .client(client) - .build(); - - let request = UploadRequest::builder() - .bucket("test-bucket") - .key("test-key") - .body(stream) - .build() - .unwrap(); - - let handle = uploader.upload(request).await.unwrap(); - - let resp = handle.join().await.unwrap(); - assert_eq!(expected_upload_id.deref(), resp.upload_id.unwrap().deref()); - assert_eq!(expected_e_tag.deref(), resp.e_tag.unwrap().deref()); - } -}