Skip to content

Commit

Permalink
add config loader and hyper 1x http client
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Aug 19, 2024
1 parent f720ca0 commit 52821d9
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 27 deletions.
3 changes: 2 additions & 1 deletion aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ repository = "https://github.com/smithy-lang/smithy-rs"
publish = false

[dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
async-channel = "2.3.1"
async-trait = "0.1.81"
aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-http = "0.60.9"
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.0"
Expand All @@ -24,7 +26,6 @@ tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync",
tracing = "0.1"

[dev-dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-smithy-mocks-experimental = "0.2.1"
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
Expand Down
23 changes: 11 additions & 12 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::config::StalledStreamProtectionConfig;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_types::SdkConfig;
use bytes::Buf;
use clap::{CommandFactory, Parser};
use tokio::fs;
Expand Down Expand Up @@ -149,11 +148,12 @@ async fn do_recursive_download(
}

async fn do_download(args: Args) -> Result<(), BoxError> {
// FIXME - move to aws_s3_transfer_manager::from_env(), need to figure out stalled stream issues first
let config = aws_config::from_env()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.http_client(aws_s3_transfer_manager::http::default_client())
.load()
.await;
warmup(&config).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

Expand All @@ -163,6 +163,8 @@ async fn do_download(args: Args) -> Result<(), BoxError> {
.client(s3_client)
.build();

warmup(&tm_config).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

if args.recursive {
Expand Down Expand Up @@ -204,16 +206,13 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
unimplemented!("recursive upload not supported yet")
}

let config = aws_config::from_env().load().await;
warmup(&config).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
let tm_config = aws_s3_transfer_manager::from_env()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();
.load()
.await;

warmup(&tm_config).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

Expand Down Expand Up @@ -291,9 +290,9 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError>
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), BoxError> {
async fn warmup(config: &aws_s3_transfer_manager::Config) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
let s3 = config.client();
s3.list_buckets().send().await?;
println!("warming up complete");
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Builder {
self
}

/// Consumes the builder and constructs a [`Config`](crate::config::Config)
/// Consumes the builder and constructs a [`Config`]
pub fn build(self) -> Config {
Config {
multipart_threshold: self.multipart_threshold_part_size,
Expand Down
14 changes: 14 additions & 0 deletions aws-s3-transfer-manager/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder};
use aws_smithy_runtime_api::client::http::SharedHttpClient;

/// The default HTTP client used by a transfer manager when not explicitly configured.
pub fn default_client() -> SharedHttpClient {
HyperClientBuilder::new()
.crypto_mode(CryptoMode::AwsLc)
.build_https()
}
130 changes: 123 additions & 7 deletions aws-s3-transfer-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
/* Automatically managed default lints */
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
/* End of automatically managed default lints */

//! AWS S3 Transfer Manager
//!
//! # Crate Features
//!
//! - `test-util`: Enables utilities for unit tests. DO NOT ENABLE IN PRODUCTION.
#![warn(
missing_debug_implementations,
missing_docs,
Expand All @@ -21,6 +14,52 @@
rust_2018_idioms
)]

//! An Amazon S3 client focused on maximizing throughput and network utilization.
//!
//! AWS S3 Transfer Manager is a high level abstraction over the base Amazon S3
//! [service API]. Transfer operations such as upload or download are automatically
//! split into concurrent requests to accelerate performance.
//!
//! [service API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations_Amazon_Simple_Storage_Service.html
//!
//! # Examples
//!
//! Load the default configuration:
//!
//! ```no_run
//! # async fn main() {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//! # }
//! ```
//!
//! Download a bucket to a local directory:
//!
//! ```no_run
//! # async fn main() {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//!
//! let handle = client
//! .download_objects()
//! .bucket("my-bucket")
//! .destination("/tmp/my-bucket")
//! .send()
//! .await?;
//!
//! // wait for transfer to complete
//! handle.join().await?;
//!
//! # }
//!
//! ```
//!
//! See the documentation for each client operation for more information:
//!
//! * [`download`](crate::Client::download) - download a single object
//! * [`upload`](crate::Client::upload) - upload a single object
//! * [`download_objects`](crate::Client::download_objects) - download an entire bucket or prefix to a local directory
pub(crate) const MEBIBYTE: u64 = 1024 * 1024;

pub(crate) const DEFAULT_CONCURRENCY: usize = 8;
Expand All @@ -45,3 +84,80 @@ pub mod config;

pub use self::client::Client;
pub use self::config::Config;

/// HTTP abstractions used by `aws-s3-transfer-manager`
pub mod http;

/// Load client configuration from the environment
pub use loader::ConfigLoader;

/// Create a config loader
pub fn from_env() -> ConfigLoader {
ConfigLoader::default()
}

mod loader {
use crate::config::Builder;
use crate::{
http,
types::{ConcurrencySetting, PartSize},
Config,
};

/// Load transfer manager [`Config`] from the environment.
#[derive(Default, Debug)]
pub struct ConfigLoader {
builder: Builder,
}

impl ConfigLoader {
/// Minimum object size that should trigger a multipart upload.
///
/// The minimum part size is 5 MiB, any part size less than that will be rounded up.
/// Default is [PartSize::Auto]
pub fn multipart_threshold(mut self, threshold: PartSize) -> Self {
self.builder = self.builder.multipart_threshold(threshold);
self
}

/// The target size of each part when using a multipart upload to complete the request.
///
/// When a request's content length is les than [`multipart_threshold`],
/// this setting is ignored and a single [`PutObject`] request will be made instead.
///
/// NOTE: The actual part size used may be larger than the configured part size if
/// the current value would result in more than 10,000 parts for an upload request.
///
/// Default is [PartSize::Auto]
///
/// [`multipart_threshold`]: method@Self::multipart_threshold
/// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
pub fn part_size(mut self, part_size: PartSize) -> Self {
self.builder = self.builder.part_size(part_size);
self
}

/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.builder = self.builder.concurrency(concurrency);
self
}

/// Load the default configuration
///
/// If fields have been overridden during builder construction, the override values will be
/// used. Otherwise, the default values for each field will be provided.
pub async fn load(self) -> Config {
let shared_config = aws_config::from_env()
.http_client(http::default_client())
.load()
.await;
let s3_client = aws_sdk_s3::Client::new(&shared_config);
let builder = self.builder.client(s3_client);
builder.build()
}
}
}
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/operation/download/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ impl fmt::Debug for DownloadInput {
}

impl DownloadInput {
/// Creates a new builder-style object to manufacture [`DownloadInput`](crate::operation::download::DownloadInput).
/// Creates a new builder-style object to manufacture [`DownloadInput`].
pub fn builder() -> DownloadInputBuilder {
DownloadInputBuilder::default()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct DownloadObjectsInput {
}

impl DownloadObjectsInput {
/// Creates a new builder-style object to manufacture [`DownloadObjectsInput`](crate::operation::download_objects::DownloadObjectsInput).
/// Creates a new builder-style object to manufacture [`DownloadObjectsInput`].
pub fn builder() -> DownloadObjectsInputBuilder {
DownloadObjectsInputBuilder::default()
}
Expand Down Expand Up @@ -84,7 +84,7 @@ impl fmt::Debug for DownloadObjectsInput {
}
}

/// A builder for [`DownloadObjectsInput`](crate::operation::download_objects::DownloadObjectsInput).
/// A builder for [`DownloadObjectsInput`].
#[non_exhaustive]
#[derive(Clone, Default)]
pub struct DownloadObjectsInputBuilder {
Expand Down Expand Up @@ -196,7 +196,7 @@ impl DownloadObjectsInputBuilder {
&self.filter
}

/// Consumes the builder and constructs a [`DownloadObjectsInput`](crate::operation::download_objects::DownloadObjectsInput).
/// Consumes the builder and constructs a [`DownloadObjectsInput`].
pub fn build(self) -> Result<DownloadObjectsInput, BuildError> {
if self.bucket.is_none() {
return Err(BuildError::missing_field("bucket", "A bucket is required"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct DownloadObjectsOutput {
}

impl DownloadObjectsOutput {
/// Creates a new builder-style object to manufacture [`DownloadObjectsOutput`](crate::operation::download_objects::DownloadObjectsOutput).
/// Creates a new builder-style object to manufacture [`DownloadObjectsOutput`].
pub fn builder() -> DownloadObjectsOutputBuilder {
DownloadObjectsOutputBuilder::default()
}
Expand All @@ -45,7 +45,7 @@ impl DownloadObjectsOutput {
}
}

/// A builder for [`DownloadObjectsOutput`](crate::operation::download_objects::DownloadObjectsOutput).
/// A builder for [`DownloadObjectsOutput`].
#[non_exhaustive]
#[derive(Debug, Default)]
pub struct DownloadObjectsOutputBuilder {
Expand Down

0 comments on commit 52821d9

Please sign in to comment.