Skip to content

Commit

Permalink
Simulate the flow of CreateMPU -> upload cancellation -> AbortMPU
Browse files Browse the repository at this point in the history
This commit addresses #78 (comment)
  • Loading branch information
ysaito1001 committed Dec 5, 2024
1 parent 0cd2d44 commit bde83ea
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions aws-s3-transfer-manager/src/operation/upload_objects/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,12 @@ fn handle_failed_upload(

#[cfg(test)]
mod tests {
use std::sync::{Arc, Barrier};

use aws_sdk_s3::operation::{
abort_multipart_upload::AbortMultipartUploadOutput,
create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput,
upload_part::UploadPartOutput,
};
use aws_smithy_mocks_experimental::{mock, RuleMode};
use bytes::Bytes;
Expand Down Expand Up @@ -736,20 +739,37 @@ mod tests {
assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_cancel_single_upload_via_multipart_upload() {
let bucket = "test-bucket";
let key = "test-key";
let upload_id: String = "test-upload-id".to_owned();

let wait_till_create_mpu = Arc::new(Barrier::new(2));
let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) =
tokio::sync::watch::channel(());
let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx);

let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({
let c = wait_till_create_mpu.clone();
let upload_id = upload_id.clone();
move || {
// This ensures that a cancellation signal won't be sent until `create_multipart_upload`.
c.wait();

// This increases the reliability of the test, ensuring that the cancellation signal has been sent
// and that `upload_single_obj` can now resume.
while !resume_upload_single_obj_rx.has_changed().unwrap() {
std::thread::sleep(std::time::Duration::from_millis(100));
}

CreateMultipartUploadOutput::builder()
.upload_id(upload_id.clone())
.build()
}
});
let upload_part = mock!(aws_sdk_s3::Client::upload_part)
.then_output(|| UploadPartOutput::builder().build());
let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
.match_requests({
let upload_id = upload_id.clone();
Expand All @@ -762,8 +782,8 @@ mod tests {
.then_output(|| AbortMultipartUploadOutput::builder().build());
let s3_client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
RuleMode::Sequential,
&[create_mpu, abort_mpu]
RuleMode::MatchAny,
&[create_mpu, upload_part, abort_mpu]
);
let config = crate::Config::builder()
.set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES))
Expand All @@ -787,7 +807,18 @@ mod tests {
key: key.to_owned(),
};

ctx.state.cancel_tx.send(true).unwrap();
tokio::task::spawn({
let ctx = ctx.clone();
let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone();
async move {
wait_till_create_mpu.wait();
// The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared,
// which means that cancellation can now be triggered.
ctx.state.cancel_tx.send(true).unwrap();
// Tell `upload_single_obj` that it can now proceed.
resume_upload_single_obj_tx.send(()).unwrap();
}
});

let err = upload_single_obj(&ctx, job).await.unwrap_err();

Expand Down

0 comments on commit bde83ea

Please sign in to comment.