Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Jul 17, 2024
1 parent 4397180 commit 3767f5f
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 54 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ cargo run --example cp -- -h
```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp s3://<my-bucket>/<my-key> /local/path/<filename>
```
NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`

**Upload a file to S3**

```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp /local/path/<filename> s3://<my-bucket>/<my-key>
```

NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`


#### Flamegraphs

See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information.
Expand All @@ -49,9 +58,18 @@ sudo AWS_PROFILE=<profile-name> RUST_LOG=aws_s3_transfer_manager=info cargo flam

#### Using tokio-console

Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with
By default examples use `tracing` crate for logs. You can pass the `--tokio-console` flag to examples to
use [`console-subscriber`](https://crates.io/crates/console-subscriber) instead. This allows you to run them with
[tokio-console](https://github.com/tokio-rs/console) to help debug task execution.

NOTE: This requires you build the examples with `RUSTFLAGS="--cfg tokio_unstable"` or setting the equivalent in
your cargo config.


```sh
RUSTFLAGS="--cfg tokio_unstable" AWS_PROFILE=<profile-name> RUST_LOG=debug cargo run --example cp --tokio-console ...
```


Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the
example with `tokio-console` running.
Expand Down
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
jemallocator = "0.5.4"
39 changes: 26 additions & 13 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ type BoxError = Box<dyn Error + Send + Sync>;

const ONE_MEGABYTE: u64 = 1000 * 1000;

#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Debug, Clone, clap::Parser)]
#[command(name = "cp")]
#[command(about = "Copies a local file or S3 object to another location locally or in S3.")]
Expand All @@ -44,6 +51,10 @@ pub struct Args {
/// Part size to use
#[arg(long, default_value_t = 8388608)]
part_size: u64,

/// Enable tokio console (requires RUSTFLAGS="--cfg tokio_unstable")
#[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)]
tokio_console: bool,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -105,14 +116,14 @@ fn invalid_arg(message: &str) -> ! {
.exit()
}

async fn do_download(args: Args) -> Result<(), Box<dyn Error>> {
async fn do_download(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;
warmup(&config).await?;

let tm = Downloader::builder()
.sdk_config(config)
.concurrency(args.concurrency)
.target_part_size(args.part_size)
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.target_part_size(TargetPartSize::Explicit(args.part_size))
.build();

let (bucket, key) = args.source.expect_s3().parts();
Expand Down Expand Up @@ -146,7 +157,7 @@ async fn do_download(args: Args) -> Result<(), Box<dyn Error>> {
Ok(())
}

async fn do_upload(args: Args) -> Result<(), Box<dyn Error>> {
async fn do_upload(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;
warmup(&config).await?;

Expand Down Expand Up @@ -188,14 +199,16 @@ async fn do_upload(args: Args) -> Result<(), Box<dyn Error>> {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.init();
// FIXME - requires --cfg tokio_unstable flag, make opt-in via cmdline
// console_subscriber::init();
async fn main() -> Result<(), BoxError> {
let args = dbg!(Args::parse());
if args.tokio_console {
console_subscriber::init();
} else {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.init();
}

use TransferUri::*;
match (&args.source, &args.dest) {
Expand All @@ -208,7 +221,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
Expand All @@ -223,7 +236,7 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Er
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), Box<dyn Error>> {
async fn warmup(config: &SdkConfig) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
s3.list_buckets().send().await?;
Expand Down
69 changes: 44 additions & 25 deletions aws-s3-transfer-manager/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::download::discovery::{discover_obj, ObjectDiscovery};
use crate::download::handle::DownloadHandle;
use crate::download::worker::{distribute_work, download_chunks, ChunkResponse};
use crate::error::TransferError;
use crate::types::{ConcurrencySetting, TargetPartSize};
use crate::MEBIBYTE;
use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder};
use aws_types::SdkConfig;
Expand Down Expand Up @@ -50,29 +51,23 @@ impl From<GetObjectInputBuilder> for DownloadRequest {
}

/// Fluent style builder for [Downloader]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct Builder {
target_part_size_bytes: u64,
// TODO(design): should we instead consider an enum here allows for not only explicit but also
// an "Auto" mode that allows us to control the concurrency actually used based on overall transfer and part size?
concurrency: usize,
target_part_size_bytes: TargetPartSize,
concurrency: ConcurrencySetting,
sdk_config: Option<SdkConfig>,
}

impl Builder {
fn new() -> Self {
Self {
target_part_size_bytes: 8 * MEBIBYTE,
concurrency: 8,
sdk_config: None,
}
fn new() -> Builder {
Builder::default()
}

/// Size of parts the object will be downloaded in, in bytes.
///
/// Defaults is 8 MiB.
pub fn target_part_size(mut self, size_bytes: u64) -> Self {
self.target_part_size_bytes = size_bytes;
/// Defaults is [TargetPartSize::Auto].
pub fn target_part_size(mut self, target_size: TargetPartSize) -> Self {
self.target_part_size_bytes = target_size;
self
}

Expand All @@ -85,8 +80,8 @@ impl Builder {
/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// Default is 8.
pub fn concurrency(mut self, concurrency: usize) -> Self {
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.concurrency = concurrency;
self
}
Expand All @@ -104,7 +99,7 @@ impl From<Builder> for Downloader {
.unwrap_or_else(|| SdkConfig::builder().build());
let client = aws_sdk_s3::Client::new(&sdk_config);
Self {
target_part_size_bytes: value.target_part_size_bytes,
target_part_size: value.target_part_size_bytes,
concurrency: value.concurrency,
client,
}
Expand All @@ -115,8 +110,8 @@ impl From<Builder> for Downloader {
/// concurrent requests (e.g. using ranged GET or part number).
#[derive(Debug, Clone)]
pub struct Downloader {
target_part_size_bytes: u64,
concurrency: usize,
target_part_size: TargetPartSize,
concurrency: ConcurrencySetting,
client: aws_sdk_s3::client::Client,
}

Expand Down Expand Up @@ -155,34 +150,42 @@ impl Downloader {
todo!("single part download not implemented")
}

let target_part_size_bytes = self.target_part_size();
let ctx = DownloadContext {
client: self.client.clone(),
target_part_size: self.target_part_size_bytes,
target_part_size_bytes,
};

let concurrency = self.concurrency();

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &req).await?;
let (comp_tx, comp_rx) = mpsc::channel(self.concurrency);
let (comp_tx, comp_rx) = mpsc::channel(concurrency);
let start_seq = handle_discovery_chunk(&mut discovery, &comp_tx).await;

// spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled.
let mut tasks = JoinSet::new();

if !discovery.remaining.is_empty() {
// start assigning work
let (work_tx, work_rx) = async_channel::bounded(self.concurrency);
let (work_tx, work_rx) = async_channel::bounded(concurrency);
let input = req.input.clone();
let part_size = self.target_part_size_bytes;
let rem = discovery.remaining.clone();

// TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once,
// the downside is controlling memory usage as a large download may result in
// quite a few futures created. If more performant could be enabled for
// objects less than some size.

tasks.spawn(distribute_work(rem, input, part_size, start_seq, work_tx));
tasks.spawn(distribute_work(
rem,
input,
target_part_size_bytes,
start_seq,
work_tx,
));

for i in 0..self.concurrency {
for i in 0..concurrency {
let worker = download_chunks(ctx.clone(), work_rx.clone(), comp_tx.clone())
.instrument(tracing::debug_span!("chunk-downloader", worker = i));
tasks.spawn(worker);
Expand All @@ -202,6 +205,22 @@ impl Downloader {

Ok(handle)
}

/// Get the concrete concurrency setting
fn concurrency(&self) -> usize {
match self.concurrency {
ConcurrencySetting::Auto => 8,
ConcurrencySetting::Explicit(explicit) => explicit,
}
}

// Get the concrete part size to use in bytes
fn target_part_size(&self) -> u64 {
match self.target_part_size {
TargetPartSize::Auto => 8 * MEBIBYTE,
TargetPartSize::Explicit(explicit) => explicit,
}
}
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/download/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
#[derive(Debug, Clone)]
pub(crate) struct DownloadContext {
pub(crate) client: aws_sdk_s3::Client,
pub(crate) target_part_size: u64,
pub(crate) target_part_size_bytes: u64,
}
10 changes: 5 additions & 5 deletions aws-s3-transfer-manager/src/download/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ pub(super) async fn discover_obj(
let byte_range = match range.as_ref() {
Some(r) => ByteRange::Inclusive(
*r.start(),
cmp::min(*r.start() + ctx.target_part_size - 1, *r.end()),
cmp::min(*r.start() + ctx.target_part_size_bytes - 1, *r.end()),
),
None => ByteRange::Inclusive(0, ctx.target_part_size - 1),
None => ByteRange::Inclusive(0, ctx.target_part_size_bytes - 1),
};
let r = request
.input
Expand Down Expand Up @@ -216,7 +216,7 @@ mod tests {

let ctx = DownloadContext {
client,
target_part_size: 5 * MEBIBYTE,
target_part_size_bytes: 5 * MEBIBYTE,
};
let request = GetObjectInput::builder()
.bucket("test-bucket")
Expand Down Expand Up @@ -266,7 +266,7 @@ mod tests {

let ctx = DownloadContext {
client,
target_part_size,
target_part_size_bytes: target_part_size,
};

let request = GetObjectInput::builder()
Expand Down Expand Up @@ -300,7 +300,7 @@ mod tests {

let ctx = DownloadContext {
client,
target_part_size,
target_part_size_bytes: target_part_size,
};

let request = GetObjectInput::builder()
Expand Down
6 changes: 4 additions & 2 deletions aws-s3-transfer-manager/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
*/

/// The target part size for an upload or download request.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub enum TargetPartSize {
/// Automatically configure an optimal target part size based on the execution environment.
#[default]
Auto,

/// Explicitly configured part size.
Explicit(u64),
}

/// The concurrency settings to use for a single upload or download request.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub enum ConcurrencySetting {
/// Automatically configure an optimal concurrency setting based on the execution environment.
#[default]
Auto,

/// Explicitly configured concurrency setting.
Expand Down
3 changes: 2 additions & 1 deletion aws-s3-transfer-manager/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ impl Builder {
}

/// Set an explicit client to use. This takes precedence over setting `sdk_config`.
#[allow(unused)] /// FIXME - this is currently only used for tests...
#[allow(unused)]
/// FIXME - this is currently only used for tests...
pub(crate) fn client(mut self, client: aws_sdk_s3::Client) -> Self {
self.client = Some(client);
self
Expand Down
8 changes: 2 additions & 6 deletions aws-s3-transfer-manager/src/upload/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ impl UploadHandle {
FailedMultipartUploadPolicy::AbortUpload => abort_upload(self).await,
FailedMultipartUploadPolicy::Retain => Ok(AbortedUpload::default()),
}

}
}
}

/// Describes the result of aborting an in-progress upload.
Expand Down Expand Up @@ -130,10 +129,7 @@ async fn complete_upload(mut handle: UploadHandle) -> Result<UploadResponse, Upl
todo!("non mpu upload not implemented yet")
}

tracing::trace!(
"joining upload_id={:?}",
handle.ctx.upload_id
);
tracing::trace!("joining upload_id={:?}", handle.ctx.upload_id);

let mut all_parts = Vec::new();

Expand Down

0 comments on commit 3767f5f

Please sign in to comment.