Skip to content

Commit

Permalink
Add support for uploading files from a directory to S3 (#67)
Browse files Browse the repository at this point in the history
* Add basic functionality for uploading all files

* Address low-hanging fruit in error handling

* Handle errors properly in the `worker` module

* Move directory traversal I/O to a special thread pool

* Add unit tests for functions in the `worker` module

* Add integration tests for uploading multiple objects

* Run `cargo fmt`

* Fix clippy warnings

* Add license to `test-common`

* Specify crate version for `test-common`

* Separate unit tests for `worker` module between Unix and Windows

* Add missing `use` statement for `derive_object_key`

* Update aws-s3-transfer-manager/test-common/src/lib.rs

Co-authored-by: Waqar Ahmed Khan <[email protected]>

* Fix upload multiple objects in the example

This commit addresses
#67 (comment)
#67 (comment)
#67 (comment)

* Avoid `unwrap` in the example

This commit addresses
#67 (comment)
#67 (comment)

* Compare `delim` against `MAIN_SEPARATOR_STR`

This commit addresses #67 (comment)

* Rename channel ends `work_*` to `list_directory_*`

This commit addresses #67 (comment)

* Add TODO to replace `&Option<String>` with `Option<&str>`

This commit addresses #67 (comment)

* Add integration test for checking `UploadInput` in falied transfer

* Also verify the object key in `failed_transfers`

* Use `.into()` from an error in constructing `InputStream`

This commit addresses #67 (comment)

* Log when `upload_objects` recevies an error from `list_directory`

This commit addresses #67 (comment)

* Add comment to `DEFAULT_DELIMITER`

This commit addresses #67 (comment)

* Add TODO for the use of the `blocking` crate

* Remove `Option` from list of failed transfers

This commit addresses #67 (comment)

* Add TODO for implementing more sophisticated error handling

This commit addresses #67 (comment)

* Add TODO to consider moving `input` out of the `State` struct

This commit responds to #67 (comment)

* Add comments on the upper `size_hint` of `InputStream`

This commit addresses #67 (comment)

* Add `impl From` to convert state to output

This commit addresses #67 (comment)

* Map `walkdir::Error` to `error::Error` accordingly

This commit addresses #67 (comment)

---------

Co-authored-by: Waqar Ahmed Khan <[email protected]>
  • Loading branch information
ysaito1001 and waahm7 authored Nov 6, 2024
1 parent 790ead4 commit c88e07f
Show file tree
Hide file tree
Showing 22 changed files with 1,164 additions and 134 deletions.
10 changes: 6 additions & 4 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
blocking = "1.6.0"
bytes = "1"
futures-util = "0.3.30"
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.1", features = ["limit", "retry", "util", "hedge", "buffer"] }
tracing = "0.1"
walkdir = "2"

[dev-dependencies]
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
Expand All @@ -34,13 +36,13 @@ clap = { version = "4.5.7", default-features = false, features = ["derive", "std
console-subscriber = "0.4.0"
http-02x = { package = "http", version = "0.2.9" }
http-body-1x = { package = "http-body", version = "1" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.12.0"
fastrand = "2.1.1"
futures-test = "0.3.30"
walkdir = "2"
tower-test = "0.4.0"
tempfile = "3.12.0"
test-common = { path = "./test-common", version = "0.1.0" }
tokio-test = "0.4.4"
tower-test = "0.4.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
jemallocator = "0.5.4"
41 changes: 37 additions & 4 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,40 @@ async fn do_download(args: Args) -> Result<(), BoxError> {
Ok(())
}

async fn do_upload(args: Args) -> Result<(), BoxError> {
if args.recursive {
unimplemented!("recursive upload not supported yet")
}
async fn do_recursive_upload(
args: Args,
tm: aws_s3_transfer_manager::Client,
) -> Result<(), BoxError> {
let Args { source, dest, .. } = args;
let source_dir = source.expect_local();
let (bucket, key_prefix) = dest.expect_s3().parts();

let start = time::Instant::now();
let handle = tm
.upload_objects()
.source(source_dir)
.bucket(bucket)
.key_prefix(key_prefix)
.recursive(true)
.send()
.await?;

let output = handle.join().await?;
tracing::info!("recursive upload output: {output:?}");

let elapsed = start.elapsed();
let transfer_size_bytes = output.total_bytes_transferred();
let throughput = Throughput::new(transfer_size_bytes, elapsed);

println!(
"uploaded {} objects totalling {transfer_size_bytes} bytes ({}) in {elapsed:?}; {throughput}",
output.objects_uploaded(),
ByteUnit::display(transfer_size_bytes)
);
Ok(())
}

async fn do_upload(args: Args) -> Result<(), BoxError> {
let (bucket, key) = args.dest.expect_s3().parts();

let tm_config = aws_s3_transfer_manager::from_env()
Expand All @@ -206,6 +235,10 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {

let tm = aws_s3_transfer_manager::Client::new(tm_config);

if args.recursive {
return do_recursive_upload(args, tm).await;
}

let path = args.source.expect_local();
let file_meta = fs::metadata(path).await.expect("file metadata");

Expand Down
4 changes: 2 additions & 2 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ impl Client {
/// Examples
/// ```no_run
/// use std::path::Path;
/// use aws_s3_transfer_manager::operation::upload_objects::UploadObjectsError;
/// use aws_s3_transfer_manager::error::Error;
///
/// async fn upload_directory(
/// client: &aws_s3_transfer_manager::Client,
/// source: &Path,
/// ) -> Result<(), UploadObjectsError> {
/// ) -> Result<(), Error> {
///
/// let handle = client
/// .upload_objects()
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Error {
}

/// General categories of transfer errors.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum ErrorKind {
/// Operation input validation issues
Expand Down
9 changes: 8 additions & 1 deletion aws-s3-transfer-manager/src/io/path_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct PathBodyBuilder {
path: Option<PathBuf>,
length: Option<u64>,
offset: Option<u64>,
metadata: Option<fs::Metadata>,
}

impl PathBodyBuilder {
Expand Down Expand Up @@ -81,6 +82,12 @@ impl PathBodyBuilder {
self
}

/// Provide `metadata` for `self.path`, potentially reducing system calls during `build` if pre-set.
pub(crate) fn metadata(mut self, metadata: fs::Metadata) -> Self {
self.metadata = Some(metadata);
self
}

/// Returns a [`InputStream`] from this builder.
pub fn build(self) -> Result<InputStream, Error> {
let path = self.path.expect("path set");
Expand All @@ -89,7 +96,7 @@ impl PathBodyBuilder {
let length = match self.length {
None => {
// TODO(aws-sdk-rust#1159, design) - evaluate if we want build() to be async and to use tokio for stat() call (bytestream FsBuilder::build() is async)
let metadata = fs::metadata(path.clone())?;
let metadata = self.metadata.unwrap_or(fs::metadata(path.clone())?);
let file_size = metadata.len();

if offset >= file_size {
Expand Down
21 changes: 20 additions & 1 deletion aws-s3-transfer-manager/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::Arc;
use std::{path::Path, sync::Arc};

use tokio::fs;

use crate::error;

/// Types for single object upload operation
pub mod upload;
Expand All @@ -17,6 +21,9 @@ pub mod download_objects;
/// Types for multiple object upload operation
pub mod upload_objects;

// The default delimiter of the S3 object key
pub(crate) const DEFAULT_DELIMITER: &str = "/";

/// Container for maintaining context required to carry out a single operation/transfer.
///
/// `State` is whatever additional operation specific state is required for the operation.
Expand All @@ -41,3 +48,15 @@ impl<State> Clone for TransferContext<State> {
}
}
}

pub(crate) async fn validate_target_is_dir(path: &Path) -> Result<(), error::Error> {
let meta = fs::metadata(path).await?;

if !meta.is_dir() {
return Err(error::invalid_input(format!(
"target is not a directory: {path:?}"
)));
}

Ok(())
}
27 changes: 8 additions & 19 deletions aws-s3-transfer-manager/src/operation/download_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ pub use handle::DownloadObjectsHandle;
mod list_objects;
mod worker;

use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use tokio::{fs, task::JoinSet};
use tokio::task::JoinSet;
use tracing::Instrument;

use crate::{error, types::FailedDownloadTransfer};
use crate::types::FailedDownloadTransfer;

use super::TransferContext;
use super::{validate_target_is_dir, TransferContext};

/// Operation struct for downloading multiple objects from Amazon S3
#[derive(Clone, Default, Debug)]
Expand All @@ -41,7 +40,7 @@ impl DownloadObjects {
) -> Result<DownloadObjectsHandle, crate::error::Error> {
// validate existence of destination and return error if it's not a directory
let destination = input.destination().expect("destination set");
validate_destination(destination).await?;
validate_target_is_dir(destination).await?;

// create span to serve as parent of spawned child tasks
let parent_span_for_tasks = tracing::debug_span!(
Expand Down Expand Up @@ -76,23 +75,13 @@ impl DownloadObjects {
}
}

async fn validate_destination(path: &Path) -> Result<(), error::Error> {
let meta = fs::metadata(path).await?;

if !meta.is_dir() {
return Err(error::invalid_input(format!(
"destination is not a directory: {path:?}"
)));
}

Ok(())
}

/// DownloadObjects operation specific state
#[derive(Debug)]
pub(crate) struct DownloadObjectsState {
// TODO - Determine if `input` should be separated from this struct
// https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603
input: DownloadObjectsInput,
failed_downloads: Mutex<Option<Vec<FailedDownloadTransfer>>>,
failed_downloads: Mutex<Vec<FailedDownloadTransfer>>,
successful_downloads: AtomicU64,
total_bytes_transferred: AtomicU64,
}
Expand All @@ -103,7 +92,7 @@ impl DownloadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: DownloadObjectsInput) -> Self {
let state = Arc::new(DownloadObjectsState {
input,
failed_downloads: Mutex::new(None),
failed_downloads: Mutex::new(Vec::new()),
successful_downloads: AtomicU64::default(),
total_bytes_transferred: AtomicU64::default(),
});
Expand Down
19 changes: 2 additions & 17 deletions aws-s3-transfer-manager/src/operation/download_objects/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::atomic::Ordering;

use tokio::task;

use super::{DownloadObjectsContext, DownloadObjectsOutput};
Expand All @@ -23,25 +21,12 @@ impl DownloadObjectsHandle {
/// Consume the handle and wait for download transfer to complete
#[tracing::instrument(skip_all, level = "debug", name = "join-download-objects")]
pub async fn join(mut self) -> Result<DownloadObjectsOutput, crate::error::Error> {
// TODO - Consider implementing more sophisticated error handling such as canceling in-progress transfers
// join all tasks
while let Some(join_result) = self.tasks.join_next().await {
join_result??;
}

let failed_downloads = self.ctx.state.failed_downloads.lock().unwrap().take();
let successful_downloads = self.ctx.state.successful_downloads.load(Ordering::SeqCst);
let total_bytes_transferred = self
.ctx
.state
.total_bytes_transferred
.load(Ordering::SeqCst);

let output = DownloadObjectsOutput::builder()
.objects_downloaded(successful_downloads)
.set_failed_transfers(failed_downloads)
.total_bytes_transferred(total_bytes_transferred)
.build();

Ok(output)
Ok(DownloadObjectsOutput::from(self.ctx.state.as_ref()))
}
}
39 changes: 25 additions & 14 deletions aws-s3-transfer-manager/src/operation/download_objects/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

use super::DownloadObjectsState;
use crate::types::FailedDownloadTransfer;
use std::sync::atomic::Ordering;

/// Output type for downloading multiple objects
#[non_exhaustive]
Expand All @@ -13,7 +15,7 @@ pub struct DownloadObjectsOutput {
pub objects_downloaded: u64,

/// A list of failed object transfers
pub failed_transfers: Option<Vec<FailedDownloadTransfer>>,
pub failed_transfers: Vec<FailedDownloadTransfer>,

// FIXME - likely remove when progress is implemented?
/// Total number of bytes transferred
Expand All @@ -32,11 +34,8 @@ impl DownloadObjectsOutput {
}

/// A slice of failed object transfers
///
/// If no value was sent for this field, a default will be set. If you want to determine if no value was
/// set, use `.failed_transfers.is_none()`
pub fn failed_transfers(&self) -> &[FailedDownloadTransfer] {
self.failed_transfers.as_deref().unwrap_or_default()
self.failed_transfers.as_slice()
}

/// The number of bytes successfully transferred (downloaded)
Expand All @@ -45,12 +44,26 @@ impl DownloadObjectsOutput {
}
}

impl From<&DownloadObjectsState> for DownloadObjectsOutput {
fn from(state: &DownloadObjectsState) -> Self {
let failed_downloads = std::mem::take(&mut *state.failed_downloads.lock().unwrap());
let successful_downloads = state.successful_downloads.load(Ordering::SeqCst);
let total_bytes_transferred = state.total_bytes_transferred.load(Ordering::SeqCst);

DownloadObjectsOutput::builder()
.objects_downloaded(successful_downloads)
.set_failed_transfers(failed_downloads)
.total_bytes_transferred(total_bytes_transferred)
.build()
}
}

/// A builder for [`DownloadObjectsOutput`](crate::operation::download_objects::DownloadObjectsOutput).
#[non_exhaustive]
#[derive(Debug, Default)]
pub struct DownloadObjectsOutputBuilder {
pub(crate) objects_downloaded: u64,
pub(crate) failed_transfers: Option<Vec<FailedDownloadTransfer>>,
pub(crate) failed_transfers: Vec<FailedDownloadTransfer>,
pub(crate) total_bytes_transferred: u64,
}

Expand All @@ -71,21 +84,19 @@ impl DownloadObjectsOutputBuilder {
/// To override the contents of this collection use
/// [`set_failed_transfers`](Self::set_failed_transfers)
pub fn failed_transfers(mut self, input: FailedDownloadTransfer) -> Self {
let mut v = self.failed_transfers.unwrap_or_default();
v.push(input);
self.failed_transfers = Some(v);
self.failed_transfers.push(input);
self
}

/// A list of failed object transfers
pub fn set_failed_transfers(mut self, input: Option<Vec<FailedDownloadTransfer>>) -> Self {
/// Set a list of failed object transfers
pub fn set_failed_transfers(mut self, input: Vec<FailedDownloadTransfer>) -> Self {
self.failed_transfers = input;
self
}

/// A list of failed object transfers
pub fn get_failed_transfers(&self) -> &Option<Vec<FailedDownloadTransfer>> {
&self.failed_transfers
/// Get a list of failed object transfers
pub fn get_failed_transfers(&self) -> &[FailedDownloadTransfer] {
self.failed_transfers.as_slice()
}

/// The number of bytes successfully transferred (downloaded)
Expand Down
Loading

0 comments on commit c88e07f

Please sign in to comment.