Skip to content

Commit

Permalink
wip test sometimes work
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 committed Dec 19, 2024
1 parent 6225e54 commit 7f2a4a4
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 97 deletions.
69 changes: 67 additions & 2 deletions aws-s3-transfer-manager/src/operation/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ mod test {
use crate::io::InputStream;
use crate::operation::upload::UploadInput;
use crate::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
use aws_sdk_s3::operation::put_object::PutObjectOutput;
Expand All @@ -236,12 +237,13 @@ mod test {
use bytes::Bytes;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Barrier;
use test_common::mock_client_with_stubbed_http_client;

#[tokio::test]
async fn test_basic_mpu() {
let expected_upload_id = Arc::new("test-upload".to_owned());
let body = Bytes::from_static(b"every adolescent dog goes bonkers early");
let body: Bytes = Bytes::from_static(b"every adolescent dog goes bonkers early");
let stream = InputStream::from(body);

let upload_id = expected_upload_id.clone();
Expand Down Expand Up @@ -310,7 +312,7 @@ mod test {
#[tokio::test]
async fn test_basic_upload_object() {
let body = Bytes::from_static(b"every adolescent dog goes bonkers early");
let stream = InputStream::from(body);
let stream: InputStream = InputStream::from(body);
let expected_e_tag = Arc::new("test-etag".to_owned());

let e_tag = expected_e_tag.clone();
Expand Down Expand Up @@ -339,4 +341,67 @@ mod test {
assert_eq!(resp.upload_id(), None);
assert_eq!(expected_e_tag.deref(), resp.e_tag().unwrap());
}

// #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// async fn test_abort_multipart_upload() {
// let expected_upload_id = Arc::new("test-upload".to_owned());
// let body: Bytes = Bytes::from_static(b"every adolescent dog goes bonkers early");
// let stream = InputStream::from(body);
// let bucket = "test-bucket";
// let key = "test-key";
// let wait_till_create_mpu = Arc::new(Barrier::new(2));

// let upload_id: Arc<String> = expected_upload_id.clone();
// let create_mpu =
// mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || {
// CreateMultipartUploadOutput::builder()
// .upload_id(upload_id.as_ref().to_owned())
// .build()
// });

// let upload_part = mock!(aws_sdk_s3::Client::upload_part).then_output({
// let wait_till_create_mpu = wait_till_create_mpu.clone();
// move || {
// wait_till_create_mpu.wait();
// UploadPartOutput::builder().build()
// }
// });

// let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
// .match_requests({
// let upload_id: Arc<String> = expected_upload_id.clone();
// move |input| {
// input.upload_id.as_ref() == Some(&upload_id)
// && input.bucket() == Some(bucket)
// && input.key() == Some(key)
// }
// })
// .then_output(|| AbortMultipartUploadOutput::builder().build());

// let client = mock_client_with_stubbed_http_client!(
// aws_sdk_s3,
// RuleMode::MatchAny,
// &[create_mpu, upload_part, abort_mpu]
// );

// let tm_config = crate::Config::builder()
// .concurrency(ConcurrencySetting::Explicit(1))
// .set_multipart_threshold(PartSize::Target(10))
// .set_target_part_size(PartSize::Target(30))
// .client(client)
// .build();

// let tm = crate::Client::new(tm_config);

// let request = UploadInput::builder()
// .bucket("test-bucket")
// .key("test-key")
// .body(stream);

// let handle = request.send_with(&tm).await.unwrap();
// wait_till_create_mpu.wait();
// let abort = handle.abort().await.unwrap();

// assert_eq!(abort.upload_id().unwrap(), expected_upload_id.deref());
// }
}
96 changes: 1 addition & 95 deletions aws-s3-transfer-manager/src/operation/upload_objects/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,27 +322,19 @@ fn handle_failed_upload(

#[cfg(test)]
mod tests {
use std::sync::{Arc, Barrier};

use aws_sdk_s3::operation::{
abort_multipart_upload::AbortMultipartUploadOutput,
create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput,
upload_part::UploadPartOutput,
};
use aws_sdk_s3::operation::put_object::PutObjectOutput;
use aws_smithy_mocks_experimental::{mock, RuleMode};
use bytes::Bytes;
use test_common::mock_client_with_stubbed_http_client;

use crate::{
client::Handle,
config::MIN_MULTIPART_PART_SIZE_BYTES,
io::InputStream,
operation::upload_objects::{
worker::{upload_single_obj, UploadObjectJob},
UploadObjectsContext, UploadObjectsInputBuilder,
},
runtime::scheduler::Scheduler,
types::PartSize,
DEFAULT_CONCURRENCY,
};

Expand Down Expand Up @@ -737,90 +729,4 @@ mod tests {

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_cancel_single_upload_via_multipart_upload() {
let bucket = "test-bucket";
let key = "test-key";
let upload_id: String = "test-upload-id".to_owned();

let wait_till_create_mpu = Arc::new(Barrier::new(2));
let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) =
tokio::sync::watch::channel(());
let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx);

let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({
let wait_till_create_mpu = wait_till_create_mpu.clone();
let upload_id = upload_id.clone();
move || {
// This ensures that a cancellation signal won't be sent until `create_multipart_upload`.
wait_till_create_mpu.wait();

// This increases the reliability of the test, ensuring that the cancellation signal has been sent
// and that `upload_single_obj` can now resume.
while !resume_upload_single_obj_rx.has_changed().unwrap() {
std::thread::sleep(std::time::Duration::from_millis(100));
}

CreateMultipartUploadOutput::builder()
.upload_id(upload_id.clone())
.build()
}
});
let upload_part = mock!(aws_sdk_s3::Client::upload_part)
.then_output(|| UploadPartOutput::builder().build());
let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
.match_requests({
let upload_id = upload_id.clone();
move |input| {
input.upload_id.as_ref() == Some(&upload_id)
&& input.bucket() == Some(bucket)
&& input.key() == Some(key)
}
})
.then_output(|| AbortMultipartUploadOutput::builder().build());
let s3_client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
RuleMode::MatchAny,
&[create_mpu, upload_part, abort_mpu]
);
let config = crate::Config::builder()
.set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES))
.client(s3_client)
.build();

let scheduler = Scheduler::new(DEFAULT_CONCURRENCY);

let handle = std::sync::Arc::new(Handle { config, scheduler });
let input = UploadObjectsInputBuilder::default()
.source("doesnotmatter")
.bucket(bucket)
.build()
.unwrap();

// specify the size of the contents so it triggers multipart upload
let contents = vec![0; MIN_MULTIPART_PART_SIZE_BYTES as usize];
let ctx = UploadObjectsContext::new(handle, input);
let job = UploadObjectJob {
object: InputStream::from(Bytes::copy_from_slice(contents.as_slice())),
key: key.to_owned(),
};

tokio::task::spawn({
let ctx = ctx.clone();
let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone();
async move {
wait_till_create_mpu.wait();
// The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared,
// which means that cancellation can now be triggered.
ctx.state.cancel_tx.send(true).unwrap();
// Tell `upload_single_obj` that it can now proceed.
resume_upload_single_obj_tx.send(()).unwrap();
}
});

let err = upload_single_obj(&ctx, job).await.unwrap_err();

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}
}

0 comments on commit 7f2a4a4

Please sign in to comment.