Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

naive mpu #2

Merged
merged 11 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[build]
# Share one `target` directory at the project root for all Cargo projects and workspaces in aws-s3-transfer-manager-rs
target-dir = "target"

[profile.profiling]
inherits = "release"
debug = true
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ cargo run --example cp -- -h
```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp s3://<my-bucket>/<my-key> /local/path/<filename>
```
NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`

**Upload a file to S3**

```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp /local/path/<filename> s3://<my-bucket>/<my-key>
```

NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`


#### Flamegraphs

See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information.
Expand All @@ -49,9 +58,18 @@ sudo AWS_PROFILE=<profile-name> RUST_LOG=aws_s3_transfer_manager=info cargo flam

#### Using tokio-console

Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with
By default examples use `tracing` crate for logs. You can pass the `--tokio-console` flag to examples to
use [`console-subscriber`](https://crates.io/crates/console-subscriber) instead. This allows you to run them with
[tokio-console](https://github.com/tokio-rs/console) to help debug task execution.

NOTE: This requires you build the examples with `RUSTFLAGS="--cfg tokio_unstable"` or setting the equivalent in
your cargo config.


```sh
RUSTFLAGS="--cfg tokio_unstable" AWS_PROFILE=<profile-name> RUST_LOG=debug cargo run --example cp --tokio-console ...
```


Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the
example with `tokio-console` running.
Expand Down
6 changes: 0 additions & 6 deletions aws-s3-transfer-manager/.cargo/config.toml

This file was deleted.

3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
jemallocator = "0.5.4"
151 changes: 92 additions & 59 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::{mem, time};
use aws_s3_transfer_manager::download::Downloader;

use aws_s3_transfer_manager::download::body::Body;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::types::{ConcurrencySetting, TargetPartSize};
use aws_s3_transfer_manager::upload::{UploadRequest, Uploader};
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_types::SdkConfig;
use bytes::Buf;
Expand All @@ -20,7 +23,14 @@ use tracing::{debug_span, Instrument};

type BoxError = Box<dyn Error + Send + Sync>;

const ONE_MEBIBYTE: u64 = 1024 * 1024;
const ONE_MEGABYTE: u64 = 1000 * 1000;

#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Debug, Clone, clap::Parser)]
#[command(name = "cp")]
Expand All @@ -41,6 +51,10 @@ pub struct Args {
/// Part size to use
#[arg(long, default_value_t = 8388608)]
part_size: u64,

/// Enable tokio console (requires RUSTFLAGS="--cfg tokio_unstable")
#[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)]
tokio_console: bool,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -102,29 +116,14 @@ fn invalid_arg(message: &str) -> ! {
.exit()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
console_subscriber::init();
let args = dbg!(Args::parse());

use TransferUri::*;
match (&args.source, &args.dest) {
(Local(_), S3(_)) => todo!("upload not implemented yet"),
(Local(_), Local(_)) => invalid_arg("local to local transfer not supported"),
(S3(_), Local(_)) => (),
(S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"),
}

async fn do_download(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;

println!("warming up client...");
warmup(&config).await?;
println!("warming up complete");

let tm = Downloader::builder()
.sdk_config(config)
.concurrency(args.concurrency)
.target_part_size(args.part_size)
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.target_part_size(TargetPartSize::Explicit(args.part_size))
.build();

let (bucket, key) = args.source.expect_s3().parts();
Expand All @@ -146,51 +145,83 @@ async fn main() -> Result<(), Box<dyn Error>> {
.await?;

let elapsed = start.elapsed();
let obj_size = handle.object_meta.total_size();
let obj_size_mebibytes = obj_size as f64 / ONE_MEBIBYTE as f64;
let obj_size_bytes = handle.object_meta.total_size();
let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64;
let obj_size_megabits = obj_size_megabytes * 8f64;

println!(
"downloaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}",
obj_size_megabits / elapsed.as_secs_f64(),
);

Ok(())
}

async fn do_upload(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;
warmup(&config).await?;

let tm = Uploader::builder()
.sdk_config(config)
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.multipart_threshold_part_size(TargetPartSize::Explicit(args.part_size))
.build();

let path = args.source.expect_local();
let file_meta = fs::metadata(path).await.expect("file metadata");

let stream = InputStream::from_path(path)?;
let (bucket, key) = args.dest.expect_s3().parts();

let request = UploadRequest::builder()
.bucket(bucket)
.key(key)
.body(stream)
.build()?;

println!("starting upload");
let start = time::Instant::now();

let handle = tm.upload(request).await?;
let _resp = handle.join().await?;
let elapsed = start.elapsed();

let obj_size_bytes = file_meta.len();
let obj_size_megabytes = obj_size_bytes as f64 / ONE_MEGABYTE as f64;
let obj_size_megabits = obj_size_megabytes * 8f64;

println!(
"downloaded {obj_size} bytes ({obj_size_mebibytes} MiB) in {elapsed:?}; MiB/s: {}",
obj_size_mebibytes / elapsed.as_secs_f64(),
"uploaded {obj_size_bytes} bytes ({obj_size_megabytes} MB) in {elapsed:?}; Mb/s: {}",
obj_size_megabits / elapsed.as_secs_f64()
);

Ok(())
}

// async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
// let b1: &[u8] = &mut [];
// let b2: &[u8] = &mut [];
// let b3: &[u8] = &mut [];
// let b4: &[u8] = &mut [];
// let b5: &[u8] = &mut [];
// let b6: &[u8] = &mut [];
// let b7: &[u8] = &mut [];
// let b8: &[u8] = &mut [];
// while let Some(chunk) = body.next().await {
// let mut chunk = chunk.unwrap();
// while chunk.has_remaining() {
// let mut dst = [
// IoSlice::new(b1),
// IoSlice::new(b2),
// IoSlice::new(b3),
// IoSlice::new(b4),
// IoSlice::new(b5),
// IoSlice::new(b6),
// IoSlice::new(b7),
// IoSlice::new(b8),
// ];
// let filled = chunk.chunks_vectored(&mut dst[..]);
// tracing::trace!("filled: {filled} io slices");
//
// let wc = dest.write_vectored(&dst[0..filled]).await?;
// tracing::trace!("wrote: {wc} bytes");
// chunk.advance(wc);
// }
// }
// Ok(())
// }

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), BoxError> {
let args = dbg!(Args::parse());
if args.tokio_console {
console_subscriber::init();
} else {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.init();
}

use TransferUri::*;
match (&args.source, &args.dest) {
(Local(_), S3(_)) => do_upload(args).await?,
(Local(_), Local(_)) => invalid_arg("local to local transfer not supported"),
(S3(_), Local(_)) => do_download(args).await?,
(S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"),
}

Ok(())
}

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
Expand All @@ -205,8 +236,10 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Er
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), Box<dyn Error>> {
let s3 = aws_sdk_s3::Client::new(&config);
async fn warmup(config: &SdkConfig) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
s3.list_buckets().send().await?;
println!("warming up complete");
Ok(())
}
Loading
Loading