diff --git a/README.md b/README.md index 56405a8..d6db43f 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,19 @@ cargo run --example cp -- -h ```sh AWS_PROFILE= RUST_LOG=trace cargo run --example cp s3:/// /local/path/ ``` +NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`. +NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace` + +**Upload a file to S3** + +```sh +AWS_PROFILE= RUST_LOG=trace cargo run --example cp /local/path/ s3:/// +``` NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`. NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace` + #### Flamegraphs See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information. @@ -49,9 +58,18 @@ sudo AWS_PROFILE= RUST_LOG=aws_s3_transfer_manager=info cargo flam #### Using tokio-console -Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with +By default examples use `tracing` crate for logs. You can pass the `--tokio-console` flag to examples to +use [`console-subscriber`](https://crates.io/crates/console-subscriber) instead. This allows you to run them with [tokio-console](https://github.com/tokio-rs/console) to help debug task execution. +NOTE: This requires you build the examples with `RUSTFLAGS="--cfg tokio_unstable"` or setting the equivalent in +your cargo config. + + +```sh +RUSTFLAGS="--cfg tokio_unstable" AWS_PROFILE= RUST_LOG=debug cargo run --example cp --tokio-console ... +``` + Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the example with `tokio-console` running. diff --git a/aws-s3-transfer-manager/Cargo.toml b/aws-s3-transfer-manager/Cargo.toml index d6570a1..7bdcf51 100644 --- a/aws-s3-transfer-manager/Cargo.toml +++ b/aws-s3-transfer-manager/Cargo.toml @@ -30,3 +30,6 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std console-subscriber = "0.3.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tempfile = "3.10.1" + +[target.'cfg(not(target_env = "msvc"))'.dev-dependencies] +jemallocator = "0.5.4" diff --git a/aws-s3-transfer-manager/examples/cp.rs b/aws-s3-transfer-manager/examples/cp.rs index 95a97c6..a1dbca9 100644 --- a/aws-s3-transfer-manager/examples/cp.rs +++ b/aws-s3-transfer-manager/examples/cp.rs @@ -25,6 +25,13 @@ type BoxError = Box; const ONE_MEGABYTE: u64 = 1000 * 1000; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + #[derive(Debug, Clone, clap::Parser)] #[command(name = "cp")] #[command(about = "Copies a local file or S3 object to another location locally or in S3.")] @@ -44,6 +51,10 @@ pub struct Args { /// Part size to use #[arg(long, default_value_t = 8388608)] part_size: u64, + + /// Enable tokio console (requires RUSTFLAGS="--cfg tokio_unstable") + #[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)] + tokio_console: bool, } #[derive(Clone, Debug)] @@ -105,14 +116,14 @@ fn invalid_arg(message: &str) -> ! { .exit() } -async fn do_download(args: Args) -> Result<(), Box> { +async fn do_download(args: Args) -> Result<(), BoxError> { let config = aws_config::from_env().load().await; warmup(&config).await?; let tm = Downloader::builder() .sdk_config(config) - .concurrency(args.concurrency) - .target_part_size(args.part_size) + .concurrency(ConcurrencySetting::Explicit(args.concurrency)) + .target_part_size(TargetPartSize::Explicit(args.part_size)) .build(); let (bucket, key) = args.source.expect_s3().parts(); @@ -146,7 +157,7 @@ async fn do_download(args: Args) -> Result<(), Box> { Ok(()) } -async fn do_upload(args: Args) -> Result<(), Box> { +async fn do_upload(args: Args) -> Result<(), BoxError> { let config = aws_config::from_env().load().await; warmup(&config).await?; @@ -188,14 +199,16 @@ async fn do_upload(args: Args) -> Result<(), Box> { } #[tokio::main] -async fn main() -> Result<(), Box> { - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_thread_ids(true) - .init(); - // FIXME - requires --cfg tokio_unstable flag, make opt-in via cmdline - // console_subscriber::init(); +async fn main() -> Result<(), BoxError> { let args = dbg!(Args::parse()); + if args.tokio_console { + console_subscriber::init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_thread_ids(true) + .init(); + } use TransferUri::*; match (&args.source, &args.dest) { @@ -208,7 +221,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box> { +async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> { while let Some(chunk) = body.next().await { let chunk = chunk.unwrap(); tracing::trace!("recv'd chunk remaining={}", chunk.remaining()); @@ -223,7 +236,7 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box Result<(), Box> { +async fn warmup(config: &SdkConfig) -> Result<(), BoxError> { println!("warming up client..."); let s3 = aws_sdk_s3::Client::new(config); s3.list_buckets().send().await?; diff --git a/aws-s3-transfer-manager/src/download.rs b/aws-s3-transfer-manager/src/download.rs index 85faed5..34b0dab 100644 --- a/aws-s3-transfer-manager/src/download.rs +++ b/aws-s3-transfer-manager/src/download.rs @@ -17,6 +17,7 @@ use crate::download::discovery::{discover_obj, ObjectDiscovery}; use crate::download::handle::DownloadHandle; use crate::download::worker::{distribute_work, download_chunks, ChunkResponse}; use crate::error::TransferError; +use crate::types::{ConcurrencySetting, TargetPartSize}; use crate::MEBIBYTE; use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder}; use aws_types::SdkConfig; @@ -50,29 +51,23 @@ impl From for DownloadRequest { } /// Fluent style builder for [Downloader] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Builder { - target_part_size_bytes: u64, - // TODO(design): should we instead consider an enum here allows for not only explicit but also - // an "Auto" mode that allows us to control the concurrency actually used based on overall transfer and part size? - concurrency: usize, + target_part_size_bytes: TargetPartSize, + concurrency: ConcurrencySetting, sdk_config: Option, } impl Builder { - fn new() -> Self { - Self { - target_part_size_bytes: 8 * MEBIBYTE, - concurrency: 8, - sdk_config: None, - } + fn new() -> Builder { + Builder::default() } /// Size of parts the object will be downloaded in, in bytes. /// - /// Defaults is 8 MiB. - pub fn target_part_size(mut self, size_bytes: u64) -> Self { - self.target_part_size_bytes = size_bytes; + /// Defaults is [TargetPartSize::Auto]. + pub fn target_part_size(mut self, target_size: TargetPartSize) -> Self { + self.target_part_size_bytes = target_size; self } @@ -85,8 +80,8 @@ impl Builder { /// Set the concurrency level this component is allowed to use. /// /// This sets the maximum number of concurrent in-flight requests. - /// Default is 8. - pub fn concurrency(mut self, concurrency: usize) -> Self { + /// Default is [ConcurrencySetting::Auto]. + pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self { self.concurrency = concurrency; self } @@ -104,7 +99,7 @@ impl From for Downloader { .unwrap_or_else(|| SdkConfig::builder().build()); let client = aws_sdk_s3::Client::new(&sdk_config); Self { - target_part_size_bytes: value.target_part_size_bytes, + target_part_size: value.target_part_size_bytes, concurrency: value.concurrency, client, } @@ -115,8 +110,8 @@ impl From for Downloader { /// concurrent requests (e.g. using ranged GET or part number). #[derive(Debug, Clone)] pub struct Downloader { - target_part_size_bytes: u64, - concurrency: usize, + target_part_size: TargetPartSize, + concurrency: ConcurrencySetting, client: aws_sdk_s3::client::Client, } @@ -155,14 +150,17 @@ impl Downloader { todo!("single part download not implemented") } + let target_part_size_bytes = self.target_part_size(); let ctx = DownloadContext { client: self.client.clone(), - target_part_size: self.target_part_size_bytes, + target_part_size_bytes, }; + let concurrency = self.concurrency(); + // make initial discovery about the object size, metadata, possibly first chunk let mut discovery = discover_obj(&ctx, &req).await?; - let (comp_tx, comp_rx) = mpsc::channel(self.concurrency); + let (comp_tx, comp_rx) = mpsc::channel(concurrency); let start_seq = handle_discovery_chunk(&mut discovery, &comp_tx).await; // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. @@ -170,9 +168,8 @@ impl Downloader { if !discovery.remaining.is_empty() { // start assigning work - let (work_tx, work_rx) = async_channel::bounded(self.concurrency); + let (work_tx, work_rx) = async_channel::bounded(concurrency); let input = req.input.clone(); - let part_size = self.target_part_size_bytes; let rem = discovery.remaining.clone(); // TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once, @@ -180,9 +177,15 @@ impl Downloader { // quite a few futures created. If more performant could be enabled for // objects less than some size. - tasks.spawn(distribute_work(rem, input, part_size, start_seq, work_tx)); + tasks.spawn(distribute_work( + rem, + input, + target_part_size_bytes, + start_seq, + work_tx, + )); - for i in 0..self.concurrency { + for i in 0..concurrency { let worker = download_chunks(ctx.clone(), work_rx.clone(), comp_tx.clone()) .instrument(tracing::debug_span!("chunk-downloader", worker = i)); tasks.spawn(worker); @@ -202,6 +205,22 @@ impl Downloader { Ok(handle) } + + /// Get the concrete concurrency setting + fn concurrency(&self) -> usize { + match self.concurrency { + ConcurrencySetting::Auto => 8, + ConcurrencySetting::Explicit(explicit) => explicit, + } + } + + // Get the concrete part size to use in bytes + fn target_part_size(&self) -> u64 { + match self.target_part_size { + TargetPartSize::Auto => 8 * MEBIBYTE, + TargetPartSize::Explicit(explicit) => explicit, + } + } } /// Handle possibly sending the first chunk of data received through discovery. Returns diff --git a/aws-s3-transfer-manager/src/download/context.rs b/aws-s3-transfer-manager/src/download/context.rs index d8f3167..15a6ce8 100644 --- a/aws-s3-transfer-manager/src/download/context.rs +++ b/aws-s3-transfer-manager/src/download/context.rs @@ -7,5 +7,5 @@ #[derive(Debug, Clone)] pub(crate) struct DownloadContext { pub(crate) client: aws_sdk_s3::Client, - pub(crate) target_part_size: u64, + pub(crate) target_part_size_bytes: u64, } diff --git a/aws-s3-transfer-manager/src/download/discovery.rs b/aws-s3-transfer-manager/src/download/discovery.rs index 1f54a7a..8b0ac83 100644 --- a/aws-s3-transfer-manager/src/download/discovery.rs +++ b/aws-s3-transfer-manager/src/download/discovery.rs @@ -82,9 +82,9 @@ pub(super) async fn discover_obj( let byte_range = match range.as_ref() { Some(r) => ByteRange::Inclusive( *r.start(), - cmp::min(*r.start() + ctx.target_part_size - 1, *r.end()), + cmp::min(*r.start() + ctx.target_part_size_bytes - 1, *r.end()), ), - None => ByteRange::Inclusive(0, ctx.target_part_size - 1), + None => ByteRange::Inclusive(0, ctx.target_part_size_bytes - 1), }; let r = request .input @@ -216,7 +216,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size: 5 * MEBIBYTE, + target_part_size_bytes: 5 * MEBIBYTE, }; let request = GetObjectInput::builder() .bucket("test-bucket") @@ -266,7 +266,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size, + target_part_size_bytes: target_part_size, }; let request = GetObjectInput::builder() @@ -300,7 +300,7 @@ mod tests { let ctx = DownloadContext { client, - target_part_size, + target_part_size_bytes: target_part_size, }; let request = GetObjectInput::builder() diff --git a/aws-s3-transfer-manager/src/types.rs b/aws-s3-transfer-manager/src/types.rs index 9e244b3..78b5e55 100644 --- a/aws-s3-transfer-manager/src/types.rs +++ b/aws-s3-transfer-manager/src/types.rs @@ -4,9 +4,10 @@ */ /// The target part size for an upload or download request. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub enum TargetPartSize { /// Automatically configure an optimal target part size based on the execution environment. + #[default] Auto, /// Explicitly configured part size. @@ -14,9 +15,10 @@ pub enum TargetPartSize { } /// The concurrency settings to use for a single upload or download request. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub enum ConcurrencySetting { /// Automatically configure an optimal concurrency setting based on the execution environment. + #[default] Auto, /// Explicitly configured concurrency setting. diff --git a/aws-s3-transfer-manager/src/upload.rs b/aws-s3-transfer-manager/src/upload.rs index 19209fc..79b90a3 100644 --- a/aws-s3-transfer-manager/src/upload.rs +++ b/aws-s3-transfer-manager/src/upload.rs @@ -103,7 +103,8 @@ impl Builder { } /// Set an explicit client to use. This takes precedence over setting `sdk_config`. - #[allow(unused)] /// FIXME - this is currently only used for tests... + #[allow(unused)] + /// FIXME - this is currently only used for tests... pub(crate) fn client(mut self, client: aws_sdk_s3::Client) -> Self { self.client = Some(client); self diff --git a/aws-s3-transfer-manager/src/upload/handle.rs b/aws-s3-transfer-manager/src/upload/handle.rs index bdc5147..3c3b50f 100644 --- a/aws-s3-transfer-manager/src/upload/handle.rs +++ b/aws-s3-transfer-manager/src/upload/handle.rs @@ -76,8 +76,7 @@ impl UploadHandle { FailedMultipartUploadPolicy::AbortUpload => abort_upload(self).await, FailedMultipartUploadPolicy::Retain => Ok(AbortedUpload::default()), } - - } + } } /// Describes the result of aborting an in-progress upload. @@ -130,10 +129,7 @@ async fn complete_upload(mut handle: UploadHandle) -> Result