diff --git a/aws-s3-transfer-manager/src/operation/upload.rs b/aws-s3-transfer-manager/src/operation/upload.rs index a8819d5..18fc296 100644 --- a/aws-s3-transfer-manager/src/operation/upload.rs +++ b/aws-s3-transfer-manager/src/operation/upload.rs @@ -228,6 +228,7 @@ mod test { use crate::io::InputStream; use crate::operation::upload::UploadInput; use crate::types::{ConcurrencySetting, PartSize}; + use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput; use aws_sdk_s3::operation::put_object::PutObjectOutput; @@ -236,12 +237,13 @@ mod test { use bytes::Bytes; use std::ops::Deref; use std::sync::Arc; + use std::sync::Barrier; use test_common::mock_client_with_stubbed_http_client; #[tokio::test] async fn test_basic_mpu() { let expected_upload_id = Arc::new("test-upload".to_owned()); - let body = Bytes::from_static(b"every adolescent dog goes bonkers early"); + let body: Bytes = Bytes::from_static(b"every adolescent dog goes bonkers early"); let stream = InputStream::from(body); let upload_id = expected_upload_id.clone(); @@ -310,7 +312,7 @@ mod test { #[tokio::test] async fn test_basic_upload_object() { let body = Bytes::from_static(b"every adolescent dog goes bonkers early"); - let stream = InputStream::from(body); + let stream: InputStream = InputStream::from(body); let expected_e_tag = Arc::new("test-etag".to_owned()); let e_tag = expected_e_tag.clone(); @@ -339,4 +341,67 @@ mod test { assert_eq!(resp.upload_id(), None); assert_eq!(expected_e_tag.deref(), resp.e_tag().unwrap()); } + + // #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + // async fn test_abort_multipart_upload() { + // let expected_upload_id = Arc::new("test-upload".to_owned()); + // let body: Bytes = Bytes::from_static(b"every adolescent dog goes bonkers early"); + // let stream = InputStream::from(body); + // let bucket = "test-bucket"; + // let key = "test-key"; + // let wait_till_create_mpu = Arc::new(Barrier::new(2)); + + // let upload_id: Arc = expected_upload_id.clone(); + // let create_mpu = + // mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || { + // CreateMultipartUploadOutput::builder() + // .upload_id(upload_id.as_ref().to_owned()) + // .build() + // }); + + // let upload_part = mock!(aws_sdk_s3::Client::upload_part).then_output({ + // let wait_till_create_mpu = wait_till_create_mpu.clone(); + // move || { + // wait_till_create_mpu.wait(); + // UploadPartOutput::builder().build() + // } + // }); + + // let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload) + // .match_requests({ + // let upload_id: Arc = expected_upload_id.clone(); + // move |input| { + // input.upload_id.as_ref() == Some(&upload_id) + // && input.bucket() == Some(bucket) + // && input.key() == Some(key) + // } + // }) + // .then_output(|| AbortMultipartUploadOutput::builder().build()); + + // let client = mock_client_with_stubbed_http_client!( + // aws_sdk_s3, + // RuleMode::MatchAny, + // &[create_mpu, upload_part, abort_mpu] + // ); + + // let tm_config = crate::Config::builder() + // .concurrency(ConcurrencySetting::Explicit(1)) + // .set_multipart_threshold(PartSize::Target(10)) + // .set_target_part_size(PartSize::Target(30)) + // .client(client) + // .build(); + + // let tm = crate::Client::new(tm_config); + + // let request = UploadInput::builder() + // .bucket("test-bucket") + // .key("test-key") + // .body(stream); + + // let handle = request.send_with(&tm).await.unwrap(); + // wait_till_create_mpu.wait(); + // let abort = handle.abort().await.unwrap(); + + // assert_eq!(abort.upload_id().unwrap(), expected_upload_id.deref()); + // } } diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index 24f0773..2e33b6d 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -322,27 +322,19 @@ fn handle_failed_upload( #[cfg(test)] mod tests { - use std::sync::{Arc, Barrier}; - - use aws_sdk_s3::operation::{ - abort_multipart_upload::AbortMultipartUploadOutput, - create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput, - upload_part::UploadPartOutput, - }; + use aws_sdk_s3::operation::put_object::PutObjectOutput; use aws_smithy_mocks_experimental::{mock, RuleMode}; use bytes::Bytes; use test_common::mock_client_with_stubbed_http_client; use crate::{ client::Handle, - config::MIN_MULTIPART_PART_SIZE_BYTES, io::InputStream, operation::upload_objects::{ worker::{upload_single_obj, UploadObjectJob}, UploadObjectsContext, UploadObjectsInputBuilder, }, runtime::scheduler::Scheduler, - types::PartSize, DEFAULT_CONCURRENCY, }; @@ -737,90 +729,4 @@ mod tests { assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_cancel_single_upload_via_multipart_upload() { - let bucket = "test-bucket"; - let key = "test-key"; - let upload_id: String = "test-upload-id".to_owned(); - - let wait_till_create_mpu = Arc::new(Barrier::new(2)); - let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) = - tokio::sync::watch::channel(()); - let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx); - - let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({ - let wait_till_create_mpu = wait_till_create_mpu.clone(); - let upload_id = upload_id.clone(); - move || { - // This ensures that a cancellation signal won't be sent until `create_multipart_upload`. - wait_till_create_mpu.wait(); - - // This increases the reliability of the test, ensuring that the cancellation signal has been sent - // and that `upload_single_obj` can now resume. - while !resume_upload_single_obj_rx.has_changed().unwrap() { - std::thread::sleep(std::time::Duration::from_millis(100)); - } - - CreateMultipartUploadOutput::builder() - .upload_id(upload_id.clone()) - .build() - } - }); - let upload_part = mock!(aws_sdk_s3::Client::upload_part) - .then_output(|| UploadPartOutput::builder().build()); - let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload) - .match_requests({ - let upload_id = upload_id.clone(); - move |input| { - input.upload_id.as_ref() == Some(&upload_id) - && input.bucket() == Some(bucket) - && input.key() == Some(key) - } - }) - .then_output(|| AbortMultipartUploadOutput::builder().build()); - let s3_client = mock_client_with_stubbed_http_client!( - aws_sdk_s3, - RuleMode::MatchAny, - &[create_mpu, upload_part, abort_mpu] - ); - let config = crate::Config::builder() - .set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES)) - .client(s3_client) - .build(); - - let scheduler = Scheduler::new(DEFAULT_CONCURRENCY); - - let handle = std::sync::Arc::new(Handle { config, scheduler }); - let input = UploadObjectsInputBuilder::default() - .source("doesnotmatter") - .bucket(bucket) - .build() - .unwrap(); - - // specify the size of the contents so it triggers multipart upload - let contents = vec![0; MIN_MULTIPART_PART_SIZE_BYTES as usize]; - let ctx = UploadObjectsContext::new(handle, input); - let job = UploadObjectJob { - object: InputStream::from(Bytes::copy_from_slice(contents.as_slice())), - key: key.to_owned(), - }; - - tokio::task::spawn({ - let ctx = ctx.clone(); - let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone(); - async move { - wait_till_create_mpu.wait(); - // The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared, - // which means that cancellation can now be triggered. - ctx.state.cancel_tx.send(true).unwrap(); - // Tell `upload_single_obj` that it can now proceed. - resume_upload_single_obj_tx.send(()).unwrap(); - } - }); - - let err = upload_single_obj(&ctx, job).await.unwrap_err(); - - assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); - } }