Skip to content

Commit

Permalink
naive mpu (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd authored Jul 18, 2024
1 parent 78f3a44 commit fc227cc
Show file tree
Hide file tree
Showing 16 changed files with 1,026 additions and 102 deletions.
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[build]
# Share one `target` directory at the project root for all Cargo projects and workspaces in aws-s3-transfer-manager-rs
target-dir = "target"

[profile.profiling]
inherits = "release"
debug = true
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ cargo run --example cp -- -h
```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp s3://<my-bucket>/<my-key> /local/path/<filename>
```
NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`

**Upload a file to S3**

```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp /local/path/<filename> s3://<my-bucket>/<my-key>
```

NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`


#### Flamegraphs

See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information.
Expand All @@ -49,9 +58,18 @@ sudo AWS_PROFILE=<profile-name> RUST_LOG=aws_s3_transfer_manager=info cargo flam

#### Using tokio-console

Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with
By default examples use `tracing` crate for logs. You can pass the `--tokio-console` flag to examples to
use [`console-subscriber`](https://crates.io/crates/console-subscriber) instead. This allows you to run them with
[tokio-console](https://github.com/tokio-rs/console) to help debug task execution.

NOTE: This requires you build the examples with `RUSTFLAGS="--cfg tokio_unstable"` or setting the equivalent in
your cargo config.


```sh
RUSTFLAGS="--cfg tokio_unstable" AWS_PROFILE=<profile-name> RUST_LOG=debug cargo run --example cp --tokio-console ...
```


Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the
example with `tokio-console` running.
Expand Down
6 changes: 0 additions & 6 deletions aws-s3-transfer-manager/.cargo/config.toml

This file was deleted.

3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
jemallocator = "0.5.4"
151 changes: 92 additions & 59 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::{mem, time};
use aws_s3_transfer_manager::download::Downloader;

use aws_s3_transfer_manager::download::body::Body;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_s3_transfer_manager::upload::{UploadRequest, Uploader};
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_types::SdkConfig;
use bytes::Buf;
Expand All @@ -20,7 +23,14 @@ use tracing::{debug_span, Instrument};

type BoxError = Box<dyn Error + Send + Sync>;

const ONE_MEBIBYTE: u64 = 1024 * 1024;
const ONE_MEGABYTE: u64 = 1000 * 1000;

#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Debug, Clone, clap::Parser)]
#[command(name = "cp")]
Expand All @@ -41,6 +51,10 @@ pub struct Args {
/// Part size to use
#[arg(long, default_value_t = 8388608)]
part_size: u64,

/// Enable tokio console (requires RUSTFLAGS="--cfg tokio_unstable")
#[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)]
tokio_console: bool,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -102,29 +116,14 @@ fn invalid_arg(message: &str) -> ! {
.exit()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
console_subscriber::init();
let args = dbg!(Args::parse());

use TransferUri::*;
match (&args.source, &args.dest) {
(Local(_), S3(_)) => todo!("upload not implemented yet"),
(Local(_), Local(_)) => invalid_arg("local to local transfer not supported"),
(S3(_), Local(_)) => (),
(S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"),
}

async fn do_download(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;

println!("warming up client...");
warmup(&config).await?;
println!("warming up complete");

let tm = Downloader::builder()
.sdk_config(config)
.concurrency(args.concurrency)
.target_part_size(args.part_size)
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.build();

let (bucket, key) = args.source.expect_s3().parts();
Expand All @@ -146,51 +145,83 @@ async fn main() -> Result<(), Box<dyn Error>> {
.await?;

let elapsed = start.elapsed();
let obj_size = handle.object_meta.total_size();
let obj_size_mebibytes = obj_size as f64 / ONE_MEBIBYTE as f64;
let obj_size_bytes = handle.object_meta.total_size();
let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64;
let obj_size_megabits = obj_size_megabytes * 8f64;

println!(
"downloaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}",
obj_size_megabits / elapsed.as_secs_f64(),
);

Ok(())
}

async fn do_upload(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;
warmup(&config).await?;

let tm = Uploader::builder()
.sdk_config(config)
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.build();

let path = args.source.expect_local();
let file_meta = fs::metadata(path).await.expect("file metadata");

let stream = InputStream::from_path(path)?;
let (bucket, key) = args.dest.expect_s3().parts();

let request = UploadRequest::builder()
.bucket(bucket)
.key(key)
.body(stream)
.build()?;

println!("starting upload");
let start = time::Instant::now();

let handle = tm.upload(request).await?;
let _resp = handle.join().await?;
let elapsed = start.elapsed();

let obj_size_bytes = file_meta.len();
let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64;
let obj_size_megabits = obj_size_megabytes * 8f64;

println!(
"downloaded {obj_size} bytes ({obj_size_mebibytes} MiB) in {elapsed:?}; MiB/s: {}",
obj_size_mebibytes / elapsed.as_secs_f64(),
"uploaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}",
obj_size_megabits / elapsed.as_secs_f64()
);

Ok(())
}

// async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
// let b1: &[u8] = &mut [];
// let b2: &[u8] = &mut [];
// let b3: &[u8] = &mut [];
// let b4: &[u8] = &mut [];
// let b5: &[u8] = &mut [];
// let b6: &[u8] = &mut [];
// let b7: &[u8] = &mut [];
// let b8: &[u8] = &mut [];
// while let Some(chunk) = body.next().await {
// let mut chunk = chunk.unwrap();
// while chunk.has_remaining() {
// let mut dst = [
// IoSlice::new(b1),
// IoSlice::new(b2),
// IoSlice::new(b3),
// IoSlice::new(b4),
// IoSlice::new(b5),
// IoSlice::new(b6),
// IoSlice::new(b7),
// IoSlice::new(b8),
// ];
// let filled = chunk.chunks_vectored(&mut dst[..]);
// tracing::trace!("filled: {filled} io slices");
//
// let wc = dest.write_vectored(&dst[0..filled]).await?;
// tracing::trace!("wrote: {wc} bytes");
// chunk.advance(wc);
// }
// }
// Ok(())
// }

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), BoxError> {
let args = dbg!(Args::parse());
if args.tokio_console {
console_subscriber::init();
} else {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.init();
}

use TransferUri::*;
match (&args.source, &args.dest) {
(Local(_), S3(_)) => do_upload(args).await?,
(Local(_), Local(_)) => invalid_arg("local to local transfer not supported"),
(S3(_), Local(_)) => do_download(args).await?,
(S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"),
}

Ok(())
}

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
Expand All @@ -205,8 +236,10 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Er
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), Box<dyn Error>> {
let s3 = aws_sdk_s3::Client::new(&config);
async fn warmup(config: &SdkConfig) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
s3.list_buckets().send().await?;
println!("warming up complete");
Ok(())
}
Loading

0 comments on commit fc227cc

Please sign in to comment.