Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow up on PR#75 #78

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::cmp;
pub(crate) mod loader;

/// Minimum upload part size in bytes
const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();
pub(crate) const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();

/// Configuration for a [`Client`](crate::client::Client)
#[derive(Debug, Clone)]
Expand Down
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/src/operation/upload/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) enum UploadType {
/// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload`
/// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload`
/// are logged, but do not affect the overall cancellation flow.
///
/// In either case, if the upload operation has already been completed before the handle is dropped
/// or aborted, the uploaded object will not be deleted from S3.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadHandle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ impl UploadObjectsHandle {
/// Consume the handle and wait for the upload to complete
///
/// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method
/// will return an error if any of the spawned tasks encounter one. The other tasks will
/// be canceled, but their cancellations will not be reported as errors by this method;
/// will return the first error if any of the spawned tasks encounter one. The other tasks
/// will be canceled, but their cancellations will not be reported as errors by this method;
/// they will be logged as errors, instead.
///
/// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the
Expand Down
98 changes: 96 additions & 2 deletions aws-s3-transfer-manager/src/operation/upload_objects/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,27 @@ fn handle_failed_upload(

#[cfg(test)]
mod tests {
use aws_sdk_s3::operation::put_object::PutObjectOutput;
use std::sync::{Arc, Barrier};

use aws_sdk_s3::operation::{
abort_multipart_upload::AbortMultipartUploadOutput,
create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput,
upload_part::UploadPartOutput,
};
use aws_smithy_mocks_experimental::{mock, RuleMode};
use bytes::Bytes;
use test_common::mock_client_with_stubbed_http_client;

use crate::{
client::Handle,
config::MIN_MULTIPART_PART_SIZE_BYTES,
io::InputStream,
operation::upload_objects::{
worker::{upload_single_obj, UploadObjectJob},
UploadObjectsContext, UploadObjectsInputBuilder,
},
runtime::scheduler::Scheduler,
types::PartSize,
DEFAULT_CONCURRENCY,
};

Expand Down Expand Up @@ -700,7 +708,7 @@ mod tests {
}

#[tokio::test]
async fn test_cancel_single_upload() {
async fn test_cancel_single_upload_via_put_object() {
let bucket = "doesnotmatter";
let put_object = mock!(aws_sdk_s3::Client::put_object)
.match_requests(move |input| input.bucket() == Some(bucket))
Expand Down Expand Up @@ -730,4 +738,90 @@ mod tests {

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_cancel_single_upload_via_multipart_upload() {
let bucket = "test-bucket";
let key = "test-key";
let upload_id: String = "test-upload-id".to_owned();

let wait_till_create_mpu = Arc::new(Barrier::new(2));
let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) =
tokio::sync::watch::channel(());
let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx);

let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({
let wait_till_create_mpu = wait_till_create_mpu.clone();
let upload_id = upload_id.clone();
move || {
// This ensures that a cancellation signal won't be sent until `create_multipart_upload`.
wait_till_create_mpu.wait();

// This increases the reliability of the test, ensuring that the cancellation signal has been sent
// and that `upload_single_obj` can now resume.
while !resume_upload_single_obj_rx.has_changed().unwrap() {
std::thread::sleep(std::time::Duration::from_millis(100));
}

CreateMultipartUploadOutput::builder()
.upload_id(upload_id.clone())
.build()
}
});
let upload_part = mock!(aws_sdk_s3::Client::upload_part)
.then_output(|| UploadPartOutput::builder().build());
let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
.match_requests({
let upload_id = upload_id.clone();
move |input| {
input.upload_id.as_ref() == Some(&upload_id)
&& input.bucket() == Some(bucket)
&& input.key() == Some(key)
}
})
.then_output(|| AbortMultipartUploadOutput::builder().build());
let s3_client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
RuleMode::MatchAny,
&[create_mpu, upload_part, abort_mpu]
);
let config = crate::Config::builder()
.set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES))
.client(s3_client)
.build();

let scheduler = Scheduler::new(DEFAULT_CONCURRENCY);

let handle = std::sync::Arc::new(Handle { config, scheduler });
let input = UploadObjectsInputBuilder::default()
.source("doesnotmatter")
.bucket(bucket)
.build()
.unwrap();

// specify the size of the contents so it triggers multipart upload
let contents = vec![0; MIN_MULTIPART_PART_SIZE_BYTES as usize];
let ctx = UploadObjectsContext::new(handle, input);
let job = UploadObjectJob {
object: InputStream::from(Bytes::copy_from_slice(contents.as_slice())),
key: key.to_owned(),
};

tokio::task::spawn({
let ctx = ctx.clone();
let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone();
async move {
wait_till_create_mpu.wait();
// The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared,
// which means that cancellation can now be triggered.
ctx.state.cancel_tx.send(true).unwrap();
// Tell `upload_single_obj` that it can now proceed.
resume_upload_single_obj_tx.send(()).unwrap();
}
});

let err = upload_single_obj(&ctx, job).await.unwrap_err();

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}
}
10 changes: 0 additions & 10 deletions aws-s3-transfer-manager/tests/upload_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOut
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_smithy_mocks_experimental::{mock, RuleMode};
use aws_smithy_runtime::client::http::test_util::infallible_client_fn;
use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs;
use bytes::Bytes;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -116,15 +115,6 @@ fn mock_s3_client_for_multipart_upload() -> aws_sdk_s3::Client {
async fn test_many_uploads_no_deadlock() {
let (_guard, _rx) = capture_test_logs();
let client = mock_s3_client_for_multipart_upload();
let client = aws_sdk_s3::Client::from_conf(
client
.config()
.to_builder()
.http_client(infallible_client_fn(|_req| {
http_02x::Response::builder().status(200).body("").unwrap()
}))
.build(),
);
let config = aws_s3_transfer_manager::Config::builder()
.client(client)
.build();
Expand Down