Skip to content

Commit

Permalink
Merge aws-s3-transfer-manager commits from smithy-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Jul 16, 2024
2 parents 8cfa03a + 5b76c51 commit af28166
Show file tree
Hide file tree
Showing 26 changed files with 4,631 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
Cargo.lock
6 changes: 6 additions & 0 deletions aws-s3-transfer-manager/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

[profile.profiling]
inherits = "release"
debug = true
2 changes: 2 additions & 0 deletions aws-s3-transfer-manager/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
flamegraph.svg
profile.json
32 changes: 32 additions & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "aws-s3-transfer-manager"
version = "0.1.0"
edition = "2021"
authors = ["AWS Rust SDK Team <[email protected]>", "Aaron Todd <[email protected]>"]
description = "S3 Transfer Manager"
license = "Apache-2.0"
repository = "https://github.com/smithy-lang/smithy-rs"
publish = false

[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.81"
aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-http = "0.60.9"
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.0"
aws-types = "1.3.3"
bytes = "1"
# FIXME - upgrade to hyper 1.x
hyper = { version = "0.14.29", features = ["client"] }
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tracing = "0.1"

[dev-dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-smithy-mocks-experimental = "0.2.1"
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"
57 changes: 57 additions & 0 deletions aws-s3-transfer-manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# AWS S3 Transfer Manager

A high performance Amazon S3 client.


## Development

**Run all tests**

```sh
cargo test --all-features
```

**Run individual test**

```sh
cargo test --lib download::worker::tests::test_distribute_work
```

### Examples

NOTE: You can use the `profiling` profile from `.cargo/config.toml` to enable release with debug info for any example.

**Copy**

See all options:
```sh
cargo run --example cp -- -h
```

**Download a file from S3**

```sh
AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp s3://<my-bucket>/<my-key> /local/path/<filename>
```

NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`

#### Flamegraphs

See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information.

Generate a flamegraph (default is to output to `flamegraph.svg`):

```sh
sudo AWS_PROFILE=<profile-name> RUST_LOG=aws_s3_transfer_manager=info cargo flamegraph --profile profiling --example cp -- s3://test-sdk-rust-aaron/mb-128.dat /tmp/mb-128.dat
```

#### Using tokio-console

Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with
[tokio-console](https://github.com/tokio-rs/console) to help debug task execution.


Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the
example with `tokio-console` running.
212 changes: 212 additions & 0 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use std::error::Error;
use std::path::PathBuf;
use std::str::FromStr;
use std::{mem, time};

use aws_s3_transfer_manager::download::Downloader;

use aws_s3_transfer_manager::download::body::Body;
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_types::SdkConfig;
use bytes::Buf;
use clap::{CommandFactory, Parser};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::{debug_span, Instrument};

type BoxError = Box<dyn Error + Send + Sync>;

const ONE_MEBIBYTE: u64 = 1024 * 1024;

#[derive(Debug, Clone, clap::Parser)]
#[command(name = "cp")]
#[command(about = "Copies a local file or S3 object to another location locally or in S3.")]
pub struct Args {
/// Source to copy from <S3Uri | Local>
#[arg(required = true)]
source: TransferUri,

/// Destination to copy to <S3Uri | Local>
#[arg(required = true)]
dest: TransferUri,

/// Number of concurrent uploads/downloads to perform.
#[arg(long, default_value_t = 8)]
concurrency: usize,

/// Part size to use
#[arg(long, default_value_t = 8388608)]
part_size: u64,
}

#[derive(Clone, Debug)]
enum TransferUri {
/// Local filesystem source/destination
Local(PathBuf),

/// S3 source/destination
S3(S3Uri),
}

impl TransferUri {
fn expect_s3(&self) -> &S3Uri {
match self {
TransferUri::S3(s3_uri) => s3_uri,
_ => panic!("expected S3Uri"),
}
}

fn expect_local(&self) -> &PathBuf {
match self {
TransferUri::Local(path) => path,
_ => panic!("expected Local"),
}
}
}

impl FromStr for TransferUri {
type Err = BoxError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let uri = if s.starts_with("s3://") {
TransferUri::S3(S3Uri(s.to_owned()))
} else {
let path = PathBuf::from_str(s).unwrap();
TransferUri::Local(path)
};
Ok(uri)
}
}

#[derive(Clone, Debug)]
struct S3Uri(String);

impl S3Uri {
/// Split the URI into it's component parts '(bucket, key)'
fn parts(&self) -> (&str, &str) {
self.0
.strip_prefix("s3://")
.expect("valid s3 uri prefix")
.split_once('/')
.expect("invalid s3 uri, missing '/' between bucket and key")
}
}

fn invalid_arg(message: &str) -> ! {
Args::command()
.error(clap::error::ErrorKind::InvalidValue, message)
.exit()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
console_subscriber::init();
let args = dbg!(Args::parse());

use TransferUri::*;
match (&args.source, &args.dest) {
(Local(_), S3(_)) => todo!("upload not implemented yet"),
(Local(_), Local(_)) => invalid_arg("local to local transfer not supported"),
(S3(_), Local(_)) => (),
(S3(_), S3(_)) => invalid_arg("s3 to s3 transfer not supported"),
}

let config = aws_config::from_env().load().await;

println!("warming up client...");
warmup(&config).await?;
println!("warming up complete");

let tm = Downloader::builder()
.sdk_config(config)
.concurrency(args.concurrency)
.target_part_size(args.part_size)
.build();

let (bucket, key) = args.source.expect_s3().parts();
let input = GetObjectInputBuilder::default().bucket(bucket).key(key);

let dest = fs::File::create(args.dest.expect_local()).await?;
println!("dest file opened, starting download");

let start = time::Instant::now();

// TODO(aws-sdk-rust#1159) - rewrite this less naively,
// likely abstract this into performant utils for single file download. Higher level
// TM will handle it's own thread pool for filesystem work
let mut handle = tm.download(input.into()).await?;
let body = mem::replace(&mut handle.body, Body::empty());

write_body(body, dest)
.instrument(debug_span!("write-output"))
.await?;

let elapsed = start.elapsed();
let obj_size = handle.object_meta.total_size();
let obj_size_mebibytes = obj_size as f64 / ONE_MEBIBYTE as f64;

println!(
"downloaded {obj_size} bytes ({obj_size_mebibytes} MiB) in {elapsed:?}; MiB/s: {}",
obj_size_mebibytes / elapsed.as_secs_f64(),
);

Ok(())
}

// async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
// let b1: &[u8] = &mut [];
// let b2: &[u8] = &mut [];
// let b3: &[u8] = &mut [];
// let b4: &[u8] = &mut [];
// let b5: &[u8] = &mut [];
// let b6: &[u8] = &mut [];
// let b7: &[u8] = &mut [];
// let b8: &[u8] = &mut [];
// while let Some(chunk) = body.next().await {
// let mut chunk = chunk.unwrap();
// while chunk.has_remaining() {
// let mut dst = [
// IoSlice::new(b1),
// IoSlice::new(b2),
// IoSlice::new(b3),
// IoSlice::new(b4),
// IoSlice::new(b5),
// IoSlice::new(b6),
// IoSlice::new(b7),
// IoSlice::new(b8),
// ];
// let filled = chunk.chunks_vectored(&mut dst[..]);
// tracing::trace!("filled: {filled} io slices");
//
// let wc = dest.write_vectored(&dst[0..filled]).await?;
// tracing::trace!("wrote: {wc} bytes");
// chunk.advance(wc);
// }
// }
// Ok(())
// }

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
let mut segment_cnt = 1;
for segment in chunk.into_segments() {
dest.write_all(segment.as_ref()).await?;
tracing::trace!("wrote segment size: {}", segment.remaining());
segment_cnt += 1;
}
tracing::trace!("chunk had {segment_cnt} segments");
}
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), Box<dyn Error>> {
let s3 = aws_sdk_s3::Client::new(&config);
s3.list_buckets().send().await?;
Ok(())
}
8 changes: 8 additions & 0 deletions aws-s3-transfer-manager/external-types.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
allowed_external_types = [
"aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder",
"aws_sdk_s3::operation::get_object::_get_object_input::GetObjectInputBuilder",
"aws_sdk_s3::operation::get_object::GetObjectError",
"aws_sdk_s3::operation::head_object::HeadObjectError",
"aws_smithy_runtime_api::*",
"aws_smithy_types::*",
]
Loading

0 comments on commit af28166

Please sign in to comment.