From 061f9745b4e2432489e9de1481f786b128dd2648 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 5 Dec 2024 13:18:49 -0600 Subject: [PATCH] Follow up on PR#75 (#78) * Incorporate post-merge review feedback from PR#75 This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1853090148 https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1856992994 * Add test for canceling upload object via MPU This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1857001515 * Fix memory leak in test detected by `LeakSanitizer` * Simulate the flow of CreateMPU -> upload cancellation -> AbortMPU This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/78#discussion_r1863896824 * Avoid a single letter variable name --- aws-s3-transfer-manager/src/config.rs | 2 +- .../src/operation/upload/handle.rs | 3 + .../src/operation/upload_objects/handle.rs | 4 +- .../src/operation/upload_objects/worker.rs | 98 ++++++++++++++++++- aws-s3-transfer-manager/tests/upload_test.rs | 10 -- 5 files changed, 102 insertions(+), 15 deletions(-) diff --git a/aws-s3-transfer-manager/src/config.rs b/aws-s3-transfer-manager/src/config.rs index a728959a..66b9cd8c 100644 --- a/aws-s3-transfer-manager/src/config.rs +++ b/aws-s3-transfer-manager/src/config.rs @@ -10,7 +10,7 @@ use std::cmp; pub(crate) mod loader; /// Minimum upload part size in bytes -const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64(); +pub(crate) const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64(); /// Configuration for a [`Client`](crate::client::Client) #[derive(Debug, Clone)] diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 74d2d02f..1bda28ed 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -44,6 +44,9 @@ pub(crate) enum UploadType { /// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload` /// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload` /// are logged, but do not affect the overall cancellation flow. +/// +/// In either case, if the upload operation has already been completed before the handle is dropped +/// or aborted, the uploaded object will not be deleted from S3. #[derive(Debug)] #[non_exhaustive] pub struct UploadHandle { diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs b/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs index 05a0f6db..319f9f5e 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs @@ -44,8 +44,8 @@ impl UploadObjectsHandle { /// Consume the handle and wait for the upload to complete /// /// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method - /// will return an error if any of the spawned tasks encounter one. The other tasks will - /// be canceled, but their cancellations will not be reported as errors by this method; + /// will return the first error if any of the spawned tasks encounter one. The other tasks + /// will be canceled, but their cancellations will not be reported as errors by this method; /// they will be logged as errors, instead. /// /// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index ee2a4488..10d75bc7 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -323,19 +323,27 @@ fn handle_failed_upload( #[cfg(test)] mod tests { - use aws_sdk_s3::operation::put_object::PutObjectOutput; + use std::sync::{Arc, Barrier}; + + use aws_sdk_s3::operation::{ + abort_multipart_upload::AbortMultipartUploadOutput, + create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput, + upload_part::UploadPartOutput, + }; use aws_smithy_mocks_experimental::{mock, RuleMode}; use bytes::Bytes; use test_common::mock_client_with_stubbed_http_client; use crate::{ client::Handle, + config::MIN_MULTIPART_PART_SIZE_BYTES, io::InputStream, operation::upload_objects::{ worker::{upload_single_obj, UploadObjectJob}, UploadObjectsContext, UploadObjectsInputBuilder, }, runtime::scheduler::Scheduler, + types::PartSize, DEFAULT_CONCURRENCY, }; @@ -700,7 +708,7 @@ mod tests { } #[tokio::test] - async fn test_cancel_single_upload() { + async fn test_cancel_single_upload_via_put_object() { let bucket = "doesnotmatter"; let put_object = mock!(aws_sdk_s3::Client::put_object) .match_requests(move |input| input.bucket() == Some(bucket)) @@ -730,4 +738,90 @@ mod tests { assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_cancel_single_upload_via_multipart_upload() { + let bucket = "test-bucket"; + let key = "test-key"; + let upload_id: String = "test-upload-id".to_owned(); + + let wait_till_create_mpu = Arc::new(Barrier::new(2)); + let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) = + tokio::sync::watch::channel(()); + let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx); + + let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({ + let wait_till_create_mpu = wait_till_create_mpu.clone(); + let upload_id = upload_id.clone(); + move || { + // This ensures that a cancellation signal won't be sent until `create_multipart_upload`. + wait_till_create_mpu.wait(); + + // This increases the reliability of the test, ensuring that the cancellation signal has been sent + // and that `upload_single_obj` can now resume. + while !resume_upload_single_obj_rx.has_changed().unwrap() { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + CreateMultipartUploadOutput::builder() + .upload_id(upload_id.clone()) + .build() + } + }); + let upload_part = mock!(aws_sdk_s3::Client::upload_part) + .then_output(|| UploadPartOutput::builder().build()); + let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload) + .match_requests({ + let upload_id = upload_id.clone(); + move |input| { + input.upload_id.as_ref() == Some(&upload_id) + && input.bucket() == Some(bucket) + && input.key() == Some(key) + } + }) + .then_output(|| AbortMultipartUploadOutput::builder().build()); + let s3_client = mock_client_with_stubbed_http_client!( + aws_sdk_s3, + RuleMode::MatchAny, + &[create_mpu, upload_part, abort_mpu] + ); + let config = crate::Config::builder() + .set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES)) + .client(s3_client) + .build(); + + let scheduler = Scheduler::new(DEFAULT_CONCURRENCY); + + let handle = std::sync::Arc::new(Handle { config, scheduler }); + let input = UploadObjectsInputBuilder::default() + .source("doesnotmatter") + .bucket(bucket) + .build() + .unwrap(); + + // specify the size of the contents so it triggers multipart upload + let contents = vec![0; MIN_MULTIPART_PART_SIZE_BYTES as usize]; + let ctx = UploadObjectsContext::new(handle, input); + let job = UploadObjectJob { + object: InputStream::from(Bytes::copy_from_slice(contents.as_slice())), + key: key.to_owned(), + }; + + tokio::task::spawn({ + let ctx = ctx.clone(); + let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone(); + async move { + wait_till_create_mpu.wait(); + // The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared, + // which means that cancellation can now be triggered. + ctx.state.cancel_tx.send(true).unwrap(); + // Tell `upload_single_obj` that it can now proceed. + resume_upload_single_obj_tx.send(()).unwrap(); + } + }); + + let err = upload_single_obj(&ctx, job).await.unwrap_err(); + + assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); + } } diff --git a/aws-s3-transfer-manager/tests/upload_test.rs b/aws-s3-transfer-manager/tests/upload_test.rs index 2450ed8e..ae65797c 100644 --- a/aws-s3-transfer-manager/tests/upload_test.rs +++ b/aws-s3-transfer-manager/tests/upload_test.rs @@ -12,7 +12,6 @@ use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOut use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput; use aws_sdk_s3::operation::upload_part::UploadPartOutput; use aws_smithy_mocks_experimental::{mock, RuleMode}; -use aws_smithy_runtime::client::http::test_util::infallible_client_fn; use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; use bytes::Bytes; use pin_project_lite::pin_project; @@ -116,15 +115,6 @@ fn mock_s3_client_for_multipart_upload() -> aws_sdk_s3::Client { async fn test_many_uploads_no_deadlock() { let (_guard, _rx) = capture_test_logs(); let client = mock_s3_client_for_multipart_upload(); - let client = aws_sdk_s3::Client::from_conf( - client - .config() - .to_builder() - .http_client(infallible_client_fn(|_req| { - http_02x::Response::builder().status(200).body("").unwrap() - })) - .build(), - ); let config = aws_s3_transfer_manager::Config::builder() .client(client) .build();