From fd603929193bd6c555e56199eb45a2fd54217d76 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 18 Nov 2024 15:18:01 -0600 Subject: [PATCH 01/11] Add ability to cancel upload directory operation --- aws-s3-transfer-manager/src/config.rs | 2 +- aws-s3-transfer-manager/src/error.rs | 6 + .../src/operation/upload/handle.rs | 16 ++ .../src/operation/upload_objects.rs | 41 ++- .../src/operation/upload_objects/handle.rs | 78 +++++- .../src/operation/upload_objects/worker.rs | 251 +++++++++++------- 6 files changed, 281 insertions(+), 113 deletions(-) diff --git a/aws-s3-transfer-manager/src/config.rs b/aws-s3-transfer-manager/src/config.rs index c6b137e..a728959 100644 --- a/aws-s3-transfer-manager/src/config.rs +++ b/aws-s3-transfer-manager/src/config.rs @@ -128,7 +128,7 @@ impl Builder { self } - /// Consumes the builder and constructs a [`Config`](crate::config::Config) + /// Consumes the builder and constructs a [`Config`] pub fn build(self) -> Config { Config { multipart_threshold: self.multipart_threshold_part_size, diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index f4e3d0a..ab38b23 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -44,6 +44,11 @@ pub enum ErrorKind { /// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket) ChildOperationFailed, + + /// The operation is being canceled because the user explicitly called `.abort` on the handle, + /// or a child operation failed with the abort policy. The operation is now transitioning into + /// a graceful shutdown mode. + OperationCancelled, } impl Error { @@ -75,6 +80,7 @@ impl fmt::Display for Error { ErrorKind::ChunkFailed => write!(f, "failed to process chunk"), ErrorKind::NotFound => write!(f, "resource not found"), ErrorKind::ChildOperationFailed => write!(f, "child operation failed"), + ErrorKind::OperationCancelled => write!(f, "operation cancelled"), } } } diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 6f56606..3c8d23f 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -28,6 +28,22 @@ pub(crate) enum UploadType { } /// Response type for a single upload object request. +/// +/// # Cancellation +/// +/// The operation can be cancelled either by dropping this handle or by calling +/// [`Self::abort`]. In both cases, any ongoing tasks will stop processing future work +/// and will not start processing anything new. However, there are subtle differences in +/// how each method cancels ongoing tasks. +/// +/// When the handle is dropped, in-progress tasks are cancelled at their await points, +/// meaning read body tasks may be interrupted mid-processing, or upload parts may be +/// terminated without calling `AbortMultipartUpload` for multipart uploads. +/// +/// In contrast, calling [`Self::abort`] attempts to cancel ongoing tasks more explicitly. +/// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload` +/// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload` +/// are logged, but do not affect the overall cancellation flow. #[derive(Debug)] #[non_exhaustive] pub struct UploadHandle { diff --git a/aws-s3-transfer-manager/src/operation/upload_objects.rs b/aws-s3-transfer-manager/src/operation/upload_objects.rs index ee29433..4bd4324 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects.rs @@ -16,7 +16,10 @@ pub use handle::UploadObjectsHandle; mod output; pub use output::{UploadObjectsOutput, UploadObjectsOutputBuilder}; -use tokio::task::JoinSet; +use tokio::{ + sync::watch::{self, Sender}, + task::JoinSet, +}; use tracing::Instrument; mod worker; @@ -52,9 +55,9 @@ impl UploadObjects { return Err(e); } }; - + let (cancel_tx, cancel_rx) = watch::channel(false); let concurrency = handle.num_workers(); - let ctx = UploadObjectsContext::new(handle.clone(), input); + let ctx = UploadObjectsContext::new(handle.clone(), input, cancel_tx); // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. let mut tasks = JoinSet::new(); @@ -62,13 +65,15 @@ impl UploadObjects { // spawn worker to discover/distribute work tasks.spawn(worker::list_directory_contents( - ctx.state.input.clone(), + ctx.state.clone(), list_directory_tx, + cancel_rx.clone(), )); for i in 0..concurrency { - let worker = worker::upload_objects(ctx.clone(), list_directory_rx.clone()) - .instrument(tracing::debug_span!("object-uploader", worker = i)); + let worker = + worker::upload_objects(ctx.clone(), list_directory_rx.clone(), cancel_rx.clone()) + .instrument(tracing::debug_span!("object-uploader", worker = i)); tasks.spawn(worker); } @@ -83,21 +88,33 @@ pub(crate) struct UploadObjectsState { // TODO - Determine if `input` should be separated from this struct // https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603 input: UploadObjectsInput, + cancel_tx: Sender, failed_uploads: Mutex>, successful_uploads: AtomicU64, total_bytes_transferred: AtomicU64, } -type UploadObjectsContext = TransferContext; - -impl UploadObjectsContext { - fn new(handle: Arc, input: UploadObjectsInput) -> Self { - let state = Arc::new(UploadObjectsState { +impl UploadObjectsState { + pub(crate) fn new(input: UploadObjectsInput, cancel_tx: Sender) -> Self { + Self { input, + cancel_tx, failed_uploads: Mutex::new(Vec::new()), successful_uploads: AtomicU64::default(), total_bytes_transferred: AtomicU64::default(), - }); + } + } +} + +type UploadObjectsContext = TransferContext; + +impl UploadObjectsContext { + fn new( + handle: Arc, + input: UploadObjectsInput, + cancel_tx: Sender, + ) -> Self { + let state = Arc::new(UploadObjectsState::new(input, cancel_tx)); TransferContext { handle, state } } } diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs b/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs index 658a861..b07b4d8 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/handle.rs @@ -3,10 +3,34 @@ * SPDX-License-Identifier: Apache-2.0 */ +use crate::{error::ErrorKind, types::FailedTransferPolicy}; + use super::{UploadObjectsContext, UploadObjectsOutput}; use tokio::task; /// Handle for `UploadObjects` operation +/// +/// # Cancellation +/// +/// The operation can be cancelled by either by dropping this handle or by calling +/// [`Self::abort`]. In both cases, any ongoing tasks will ignore future work and +/// will not start processing anything new. However, there are subtle differences +/// in how each cancels ongoing tasks. +/// +/// When the handle is dropped, in-progress tasks will be cancelled at the await +/// points where their futures are waiting. This means a particular upload +/// operation may be terminated mid-process, without completing the upload or +/// calling `AbortMultipartUpload` for multipart uploads (if the upload is +/// multipart, as opposed to a simple `PutObject`). In the case of `Drop`, +/// tasks will be forcefully terminated, regardless of the `FailedTransferPolicy` +/// associated with the handle. +/// +/// Calling [`Self::abort`], on the other hand, provides more deterministic cancellation +/// behavior. If the `FailedTransferPolicy` for the handle is set to `Abort`, the +/// individual upload task can either complete the current upload operation or call +/// `AbortMultipartUpload` in the case of multipart uploads. Errors encountered during +/// `AbortMultipartUpload` will be logged, but will not affect the program's cancellation +/// control flow. #[derive(Debug)] #[non_exhaustive] pub struct UploadObjectsHandle { @@ -18,13 +42,61 @@ pub struct UploadObjectsHandle { impl UploadObjectsHandle { /// Consume the handle and wait for the upload to complete + /// + /// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method + /// will return an error if any of the spawned tasks encounter one. The other tasks will + /// be canceled, but their cancellations will not be reported as errors by this method; + /// they will be logged as errors, instead. + /// + /// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the + /// [`UploadObjectsOutput`] will include a detailed breakdown, such as the number of + /// successful uploads and the number of failed ones. + /// + // TODO(aws-sdk-rust#1159) - Consider if we want to return failed `AbortMultipartUpload` during cancellation. #[tracing::instrument(skip_all, level = "debug", name = "join-upload-objects-")] pub async fn join(mut self) -> Result { - // TODO - Consider implementing more sophisticated error handling such as canceling in-progress transfers + let mut first_error_to_report = None; while let Some(join_result) = self.tasks.join_next().await { - join_result??; + let result = join_result.expect("task completed"); + if let Err(e) = result { + match self.ctx.state.input.failure_policy() { + FailedTransferPolicy::Abort + if first_error_to_report.is_none() + && e.kind() != &ErrorKind::OperationCancelled => + { + first_error_to_report = Some(e); + } + FailedTransferPolicy::Continue => { + tracing::warn!("encountered but dismissed error when the failure policy is `Continue`: {e}") + } + _ => {} + } + } + } + + if let Some(e) = first_error_to_report { + Err(e) + } else { + Ok(UploadObjectsOutput::from(self.ctx.state.as_ref())) + } + } + + /// Aborts all tasks owned by the handle. + /// + /// Unlike `Drop`, calling `abort` gracefully shuts down any in-progress tasks. + /// Specifically, ongoing upload tasks will be allowed to complete their current work, + /// but any future uploads will be ignored. The task of listing directory contents will + /// stop yielding new directory contents. + pub async fn abort(&mut self) -> Result<(), crate::error::Error> { + if self.ctx.state.input.failure_policy() == &FailedTransferPolicy::Abort { + if self.ctx.state.cancel_tx.send(true).is_err() { + tracing::warn!( + "all receiver ends have been dropped, unable to send a cancellation signal" + ); + } + while (self.tasks.join_next().await).is_some() {} } - Ok(UploadObjectsOutput::from(self.ctx.state.as_ref())) + Ok(()) } } diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index 33033ac..ee34310 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -6,11 +6,13 @@ use std::borrow::Cow; use std::path::{MAIN_SEPARATOR, MAIN_SEPARATOR_STR}; use std::sync::atomic::Ordering; +use std::sync::Arc; -use super::{UploadObjectsContext, UploadObjectsInput}; +use super::{UploadObjectsContext, UploadObjectsInput, UploadObjectsState}; use async_channel::{Receiver, Sender}; use blocking::Unblock; use futures_util::StreamExt; +use tokio::sync::watch; use walkdir::WalkDir; use crate::error::ErrorKind; @@ -20,6 +22,9 @@ use crate::operation::DEFAULT_DELIMITER; use crate::types::{FailedTransferPolicy, FailedUpload, UploadFilter}; use crate::{error, types::UploadFilterItem}; +const CANCELLATION_ERROR: &str = + "at least one operation has been aborted, cancelling all ongoing requests"; + #[derive(Debug)] pub(super) struct UploadObjectJob { key: String, @@ -33,9 +38,12 @@ impl UploadObjectJob { } pub(super) async fn list_directory_contents( - input: UploadObjectsInput, + state: Arc, list_directory_tx: Sender>, + mut cancel_rx: watch::Receiver, ) -> Result<(), error::Error> { + let input = &state.input; + // TODO - Reevaluate the need for the `blocking` crate once we implement stricter task cancellation for download and upload. // If we switch to using `tokio::task::spawn_blocking` instead of the `blocking` crate, the entire `list_directory_contents` function // would need to be passed to `spawn_blocking`, which implies the following: @@ -43,72 +51,88 @@ pub(super) async fn list_directory_contents( // - The `AbortHandle` returned by `spawn_blocking` would not have any effect when calling `abort`, which may impact our task cancellation behavior. // Move a blocking I/O to a dedicated thread pool - let mut walker = Unblock::new(walker(&input).into_iter()); + let mut walker = Unblock::new(walker(input).into_iter()); let default_filter = &UploadFilter::default(); let filter = input.filter().unwrap_or(default_filter); - while let Some(entry) = walker.next().await { - let job = match entry { - Ok(entry) => { - let symlink_metadata = tokio::fs::symlink_metadata(entry.path()).await?; - let metadata = if symlink_metadata.is_symlink() { - if input.follow_symlinks { - tokio::fs::metadata(entry.path()).await? - } else { - continue; - } - } else { - // In this branch, we know `symlink_metadata` does not represent a symlink link, - // so it can return true for either `is_dir()` or `is_file()`. - symlink_metadata - }; - let filter_item = UploadFilterItem::builder() - .path(entry.path()) - .metadata(metadata.clone()) - .build(); - if !(filter.predicate)(&filter_item) { - tracing::debug!("skipping object due to filter: {:?}", entry.path()); - continue; - } - - let recursion_root_dir_path = input.source().expect("source set"); - let entry_path = entry.path(); - let relative_filename = entry_path - .strip_prefix(recursion_root_dir_path) - .expect("{entry_path:?} should be a path entry directly or indirectly under {recursion_root_dir_path:?}") - .to_str() - .expect("valid utf-8 path"); - let object_key = - derive_object_key(relative_filename, input.key_prefix(), input.delimiter())?; - let object = InputStream::read_from() - .path(entry.path()) - .metadata(metadata) - .build(); - - match object { - Ok(object) => { - tracing::info!( - "uploading {relative_filename} with object key {object_key}..." - ); - Ok(UploadObjectJob::new(object_key.into_owned(), object)) + loop { + tokio::select! { + _ = cancel_rx.changed() => { + tracing::error!("received cancellation signal, exiting and not yielding new directory contents"); + return Err(crate::error::Error::new(ErrorKind::OperationCancelled, CANCELLATION_ERROR.to_owned())); + } + entry = walker.next() => { + match entry { + None => break, + Some(entry) => { + let job = match entry { + Ok(entry) => { + let symlink_metadata = tokio::fs::symlink_metadata(entry.path()).await?; + let metadata = if symlink_metadata.is_symlink() { + if input.follow_symlinks { + tokio::fs::metadata(entry.path()).await? + } else { + continue; + } + } else { + // In this branch, we know `symlink_metadata` does not represent a symlink link, + // so it can return true for either `is_dir()` or `is_file()`. + symlink_metadata + }; + let filter_item = UploadFilterItem::builder() + .path(entry.path()) + .metadata(metadata.clone()) + .build(); + if !(filter.predicate)(&filter_item) { + tracing::debug!("skipping object due to filter: {:?}", entry.path()); + continue; + } + + let recursion_root_dir_path = input.source().expect("source set"); + let entry_path = entry.path(); + let relative_filename = entry_path + .strip_prefix(recursion_root_dir_path) + .expect("{entry_path:?} should be a path entry directly or indirectly under {recursion_root_dir_path:?}") + .to_str() + .expect("valid utf-8 path"); + let object_key = + derive_object_key(relative_filename, input.key_prefix(), input.delimiter())?; + let object = InputStream::read_from() + .path(entry.path()) + .metadata(metadata) + .build(); + + match object { + Ok(object) => { + tracing::debug!( + "preparing to upload {relative_filename} with object key {object_key}..." + ); + Ok(UploadObjectJob::new(object_key.into_owned(), object)) + } + Err(e) => Err(e.into()), + } + } + Err(walkdir_error) => { + let error_kind = if walkdir_error.io_error().is_some() { + ErrorKind::IOError + } else { + ErrorKind::InputInvalid + }; + + // We avoid converting `walkdir::Error` into `std::io::Error` to preserve important information, + // such as which path entry triggered a `PermissionDenied` error. + Err(crate::error::Error::new(error_kind, walkdir_error)) + } + }; + if list_directory_tx.send(job).await.is_err() { + tracing::error!("all receiver ends have been dropped, unable to send a job!"); + break; + } } - Err(e) => Err(e.into()), } } - Err(walkdir_error) => { - let error_kind = if walkdir_error.io_error().is_some() { - ErrorKind::IOError - } else { - ErrorKind::InputInvalid - }; - - // We avoid converting `walkdir::Error` into `std::io::Error` to preserve important information, - // such as which path entry triggered a `PermissionDenied` error. - Err(crate::error::Error::new(error_kind, walkdir_error)) - } - }; - list_directory_tx.send(job).await.expect("channel valid"); + } } Ok(()) @@ -163,42 +187,56 @@ fn derive_object_key<'a>( pub(super) async fn upload_objects( ctx: UploadObjectsContext, list_directory_rx: Receiver>, + mut cancel_rx: watch::Receiver, ) -> Result<(), error::Error> { - while let Ok(job) = list_directory_rx.recv().await { - match job { - Ok(job) => { - let key = job.key.clone(); - let result = upload_single_obj(&ctx, job).await; - match result { - Ok(bytes_transferred) => { - ctx.state.successful_uploads.fetch_add(1, Ordering::SeqCst); - - ctx.state - .total_bytes_transferred - .fetch_add(bytes_transferred, Ordering::SeqCst); - - tracing::debug!("worker finished uploading object {key:?}"); - } - Err(err) => { - tracing::debug!("worker failed to upload object {key:?}: {err}"); - handle_failed_upload(err, &ctx, Some(key))?; + loop { + tokio::select! { + _ = cancel_rx.changed() => { + tracing::error!("received cancellation signal, exiting and ignoring any future work"); + return Err(crate::error::Error::new(ErrorKind::OperationCancelled, CANCELLATION_ERROR.to_owned())); + } + job = list_directory_rx.recv() => { + match job { + Err(_) => break, + Ok(job) => { + match job { + Ok(job) => { + let key = job.key.clone(); + let result = upload_single_obj(&ctx, job, cancel_rx.clone()).await; + match result { + Ok(bytes_transferred) => { + ctx.state.successful_uploads.fetch_add(1, Ordering::SeqCst); + + ctx.state + .total_bytes_transferred + .fetch_add(bytes_transferred, Ordering::SeqCst); + + tracing::debug!("worker finished uploading object {key:?}"); + } + Err(err) => { + tracing::debug!("worker failed to upload object {key:?}: {err}"); + handle_failed_upload(err, &ctx, Some(key))?; + } + } + } + Err(err) => { + tracing::debug!("worker received an error from the `list_directory` task: {err}"); + handle_failed_upload(err, &ctx, None)?; + } + } } } } - Err(err) => { - tracing::debug!("worker received an error from the `list_directory` task: {err}"); - handle_failed_upload(err, &ctx, None)?; - } } } - tracing::trace!("req channel closed, worker finished"); Ok(()) } async fn upload_single_obj( ctx: &UploadObjectsContext, job: UploadObjectJob, + cancel_rx: watch::Receiver, ) -> Result { let UploadObjectJob { object, key } = job; @@ -217,11 +255,19 @@ async fn upload_single_obj( .build() .expect("valid input"); - let handle = crate::operation::upload::Upload::orchestrate(ctx.handle.clone(), input).await?; + let mut handle = + crate::operation::upload::Upload::orchestrate(ctx.handle.clone(), input).await?; - handle.join().await?; - - Ok(bytes_transferred) + if cancel_rx.has_changed().unwrap() { + let _ = handle.abort().await; + Err(crate::error::Error::new( + ErrorKind::OperationCancelled, + CANCELLATION_ERROR.to_owned(), + )) + } else { + handle.join().await?; + Ok(bytes_transferred) + } } fn handle_failed_upload( @@ -230,11 +276,17 @@ fn handle_failed_upload( object_key: Option, ) -> Result<(), error::Error> { match ctx.state.input.failure_policy() { - // TODO - this will abort this worker, the rest of the workers will be aborted - // when the handle is joined and the error is propagated and the task set is - // dropped. This _may_ be later/too passive and we might consider aborting all - // the tasks on error rather than relying on join and then drop. - FailedTransferPolicy::Abort => Err(err), + FailedTransferPolicy::Abort => { + // Sending a cancellation signal during graceful shutdown would be redundant. + if err.kind() != &ErrorKind::OperationCancelled + && ctx.state.cancel_tx.send(true).is_err() + { + tracing::warn!( + "all receiver ends have been dropped, unable to send a cancellation signal" + ); + } + Err(err) + } FailedTransferPolicy::Continue => { let mut failures = ctx.state.failed_uploads.lock().unwrap(); @@ -262,9 +314,9 @@ fn handle_failed_upload( } #[cfg(test)] -mod unit { +mod tests { #[cfg(target_family = "unix")] - mod unix_tests { + mod unix { use crate::operation::upload_objects::worker::*; use crate::operation::upload_objects::UploadObjectsInputBuilder; use std::collections::BTreeMap; @@ -333,8 +385,13 @@ mod unit { input: UploadObjectsInput, ) -> (BTreeMap, Vec) { let (list_directory_tx, list_directory_rx) = async_channel::unbounded(); + let (cancel_tx, cancel_rx) = watch::channel(false); - let join_handle = tokio::spawn(list_directory_contents(input, list_directory_tx)); + let join_handle = tokio::spawn(list_directory_contents( + Arc::new(UploadObjectsState::new(input, cancel_tx)), + list_directory_tx, + cancel_rx, + )); let mut successes = BTreeMap::new(); let mut errors = Vec::new(); @@ -606,7 +663,7 @@ mod unit { } #[cfg(target_family = "windows")] - mod window_tests { + mod windows { use crate::operation::upload_objects::worker::*; #[test] From c82303e582ec6f5d7d9a52355ccb45f0f2503823 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 18 Nov 2024 15:18:15 -0600 Subject: [PATCH 02/11] Add unit test to verify cancellation of a single upload --- .../src/operation/upload_objects/worker.rs | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index ee34310..444a3a9 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -315,6 +315,22 @@ fn handle_failed_upload( #[cfg(test)] mod tests { + use aws_sdk_s3::operation::put_object::PutObjectOutput; + use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; + use bytes::Bytes; + use tokio::sync::watch; + + use crate::{ + client::Handle, + io::InputStream, + operation::upload_objects::{ + worker::{upload_single_obj, UploadObjectJob}, + UploadObjectsContext, UploadObjectsInputBuilder, + }, + runtime::scheduler::Scheduler, + DEFAULT_CONCURRENCY, + }; + #[cfg(target_family = "unix")] mod unix { use crate::operation::upload_objects::worker::*; @@ -674,4 +690,36 @@ mod tests { ); } } + + #[tokio::test] + async fn test_cancel_single_upload() { + let bucket = "doesnotmatter"; + let put_object = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.bucket() == Some(bucket)) + .then_output(|| PutObjectOutput::builder().build()); + + let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); + let config = crate::Config::builder().client(s3_client).build(); + + let (cancel_tx, cancel_rx) = watch::channel(false); + let scheduler = Scheduler::new(DEFAULT_CONCURRENCY); + + let handle = std::sync::Arc::new(Handle { config, scheduler }); + let input = UploadObjectsInputBuilder::default() + .source("doesnotmatter") + .bucket(bucket) + .build() + .unwrap(); + let ctx = UploadObjectsContext::new(handle, input, cancel_tx); + let job = UploadObjectJob { + object: InputStream::from(Bytes::from_static(b"doesnotmatter")), + key: "doesnotmatter".to_owned(), + }; + + ctx.state.cancel_tx.send(true).unwrap(); + + let err = upload_single_obj(&ctx, job, cancel_rx).await.unwrap_err(); + + assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); + } } From 53c1bb269158a64991b6e0f9d5c22eb6d1b007cb Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 18 Nov 2024 15:18:32 -0600 Subject: [PATCH 03/11] Add integ tests to verify cancellation of multiple uploads --- .../tests/upload_objects_test.rs | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/aws-s3-transfer-manager/tests/upload_objects_test.rs b/aws-s3-transfer-manager/tests/upload_objects_test.rs index 287f2d4..08c7352 100644 --- a/aws-s3-transfer-manager/tests/upload_objects_test.rs +++ b/aws-s3-transfer-manager/tests/upload_objects_test.rs @@ -21,6 +21,7 @@ use aws_sdk_s3::{ Client, }; use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; +use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; use aws_smithy_runtime_api::http::StatusCode; use aws_smithy_types::body::SdkBody; use test_common::create_test_dir; @@ -401,3 +402,104 @@ async fn test_error_when_custom_delimiter_appears_in_filename() { assert!(format!("{}", DisplayErrorContext(err)) .contains("a custom delimiter `-` should not appear")); } + +#[tokio::test] +async fn test_abort_on_handle_should_terminate_tasks_gracefully() { + let (_guard, rx) = capture_test_logs(); + + let recursion_root = "test"; + let files = vec![ + ("sample.jpg", 1), + ("photos/2022-January/sample.jpg", 1), + ("photos/2022-February/sample1.jpg", 1), + ("photos/2022-February/sample2.jpg", 1), + ("photos/2022-February/sample3.jpg", 1), + ]; + let test_dir = create_test_dir(Some(recursion_root), files.clone(), &[]); + + let bucket_name = "test-bucket"; + let put_object = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.bucket() == Some(bucket_name)) + .then_output(|| { + std::thread::sleep(std::time::Duration::from_millis(500)); + PutObjectOutput::builder().build() + }); + + let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); + let config = aws_s3_transfer_manager::Config::builder() + .client(s3_client) + .build(); + let sut = aws_s3_transfer_manager::Client::new(config); + + let mut handle = sut + .upload_objects() + .bucket(bucket_name) + .source(test_dir.path()) + .recursive(true) + .send() + .await + .unwrap(); + + // give some time to uploaded tasks so they can receive jobs from the task of listing directory contents. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + handle.abort().await.unwrap(); + + assert!(rx.contents().contains("received cancellation signal")); +} + +#[tokio::test] +async fn test_failed_child_operation_should_cause_ongoing_requests_to_be_cancelled() { + let (_guard, rx) = capture_test_logs(); + + let recursion_root = "test"; + let files = vec![ + ("sample.jpg", 1), + ("photos/2022-January/sample.jpg", 1), + ("photos/2022-February/sample1.jpg", 1), + ("photos/2022-February/sample2.jpg", 1), + ("photos/2022-February/sample3.jpg", 1), + ]; + let test_dir = create_test_dir(Some(recursion_root), files.clone(), &[]); + + let bucket_name = "test-bucket"; + let put_object_success = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.key() != Some("photos/2022-February/sample3.jpg")) + .then_output(|| { + std::thread::sleep(std::time::Duration::from_secs(1)); + PutObjectOutput::builder().build() + }); + + let put_object_err = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.key() == Some("photos/2022-February/sample3.jpg")) + .then_http_response(|| { + HttpResponse::new(StatusCode::try_from(500).unwrap(), SdkBody::empty()) + }); + + let s3_client = mock_client!( + aws_sdk_s3, + RuleMode::MatchAny, + &[put_object_success, put_object_err] + ); + let config = aws_s3_transfer_manager::Config::builder() + .client(s3_client) + .build(); + let sut = aws_s3_transfer_manager::Client::new(config); + + let handle = sut + .upload_objects() + .bucket(bucket_name) + .source(test_dir.path()) + .recursive(true) + .send() + .await + .unwrap(); + + // `join` will report the actual error that triggered cancellation. + assert_eq!( + &ErrorKind::ChildOperationFailed, + handle.join().await.unwrap_err().kind() + ); + // the execution should see at least one cancellation signal being delivered. + assert!(rx.contents().contains("received cancellation signal")); +} From 5ff9335d16756ab3f6ba92f64d7b85e8d01f954e Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 18 Nov 2024 15:27:08 -0600 Subject: [PATCH 04/11] Run `cargo fmt` to fix formatting --- aws-s3-transfer-manager/src/operation/upload/handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 3c8d23f..74d2d02 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -28,7 +28,7 @@ pub(crate) enum UploadType { } /// Response type for a single upload object request. -/// +/// /// # Cancellation /// /// The operation can be cancelled either by dropping this handle or by calling From 55f8ce77f50467066dfacad22cee629952a83c18 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 18 Nov 2024 21:23:26 -0600 Subject: [PATCH 05/11] Make integration test behavior more deterministic --- .../tests/upload_objects_test.rs | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/aws-s3-transfer-manager/tests/upload_objects_test.rs b/aws-s3-transfer-manager/tests/upload_objects_test.rs index 08c7352..6de4328 100644 --- a/aws-s3-transfer-manager/tests/upload_objects_test.rs +++ b/aws-s3-transfer-manager/tests/upload_objects_test.rs @@ -5,6 +5,11 @@ #![cfg(target_family = "unix")] +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + use aws_s3_transfer_manager::{ error::ErrorKind, metrics::unit::ByteUnit, @@ -25,7 +30,7 @@ use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; use aws_smithy_runtime_api::http::StatusCode; use aws_smithy_types::body::SdkBody; use test_common::create_test_dir; -use tokio::fs::symlink; +use tokio::{fs::symlink, sync::watch}; // Create an S3 client with mock behavior configured for `PutObject` fn mock_s3_client_for_put_object(bucket_name: String) -> Client { @@ -417,12 +422,17 @@ async fn test_abort_on_handle_should_terminate_tasks_gracefully() { ]; let test_dir = create_test_dir(Some(recursion_root), files.clone(), &[]); + let (watch_tx, watch_rx) = watch::channel(()); + let bucket_name = "test-bucket"; let put_object = mock!(aws_sdk_s3::Client::put_object) .match_requests(move |input| input.bucket() == Some(bucket_name)) - .then_output(|| { - std::thread::sleep(std::time::Duration::from_millis(500)); - PutObjectOutput::builder().build() + .then_output({ + let rx = watch_rx.clone(); + move || { + while !rx.has_changed().unwrap() {} + PutObjectOutput::builder().build() + } }); let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); @@ -440,8 +450,7 @@ async fn test_abort_on_handle_should_terminate_tasks_gracefully() { .await .unwrap(); - // give some time to uploaded tasks so they can receive jobs from the task of listing directory contents. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + watch_tx.send(()).unwrap(); handle.abort().await.unwrap(); @@ -463,24 +472,21 @@ async fn test_failed_child_operation_should_cause_ongoing_requests_to_be_cancell let test_dir = create_test_dir(Some(recursion_root), files.clone(), &[]); let bucket_name = "test-bucket"; - let put_object_success = mock!(aws_sdk_s3::Client::put_object) - .match_requests(move |input| input.key() != Some("photos/2022-February/sample3.jpg")) - .then_output(|| { - std::thread::sleep(std::time::Duration::from_secs(1)); - PutObjectOutput::builder().build() - }); - let put_object_err = mock!(aws_sdk_s3::Client::put_object) - .match_requests(move |input| input.key() == Some("photos/2022-February/sample3.jpg")) - .then_http_response(|| { - HttpResponse::new(StatusCode::try_from(500).unwrap(), SdkBody::empty()) + let accessed = Arc::new(AtomicBool::new(false)); + let put_object = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.bucket() == Some(bucket_name)) + .then_http_response(move || { + let already_accessed = accessed.swap(true, Ordering::SeqCst); + if already_accessed { + HttpResponse::new(StatusCode::try_from(200).unwrap(), SdkBody::empty()) + } else { + // Force the first call to PubObject to fail, triggering operation cancellation for all subsequent PubObject calls. + HttpResponse::new(StatusCode::try_from(500).unwrap(), SdkBody::empty()) + } }); - let s3_client = mock_client!( - aws_sdk_s3, - RuleMode::MatchAny, - &[put_object_success, put_object_err] - ); + let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); let config = aws_s3_transfer_manager::Config::builder() .client(s3_client) .build(); From 3a4c8b1381dbb6aaefcccda50dccf6ea61d101b8 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 21 Nov 2024 12:16:15 -0600 Subject: [PATCH 06/11] Store `Receiver` in the state instead of passing it around This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1850930805 --- .../src/operation/upload_objects.rs | 21 +++++++----- .../src/operation/upload_objects/worker.rs | 32 ++++++++++++------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/aws-s3-transfer-manager/src/operation/upload_objects.rs b/aws-s3-transfer-manager/src/operation/upload_objects.rs index 4bd4324..39d6ca9 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects.rs @@ -17,7 +17,7 @@ pub use handle::UploadObjectsHandle; mod output; pub use output::{UploadObjectsOutput, UploadObjectsOutputBuilder}; use tokio::{ - sync::watch::{self, Sender}, + sync::watch::{self, Receiver, Sender}, task::JoinSet, }; use tracing::Instrument; @@ -57,7 +57,7 @@ impl UploadObjects { }; let (cancel_tx, cancel_rx) = watch::channel(false); let concurrency = handle.num_workers(); - let ctx = UploadObjectsContext::new(handle.clone(), input, cancel_tx); + let ctx = UploadObjectsContext::new(handle.clone(), input, cancel_tx, cancel_rx); // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. let mut tasks = JoinSet::new(); @@ -67,13 +67,11 @@ impl UploadObjects { tasks.spawn(worker::list_directory_contents( ctx.state.clone(), list_directory_tx, - cancel_rx.clone(), )); for i in 0..concurrency { - let worker = - worker::upload_objects(ctx.clone(), list_directory_rx.clone(), cancel_rx.clone()) - .instrument(tracing::debug_span!("object-uploader", worker = i)); + let worker = worker::upload_objects(ctx.clone(), list_directory_rx.clone()) + .instrument(tracing::debug_span!("object-uploader", worker = i)); tasks.spawn(worker); } @@ -89,16 +87,22 @@ pub(crate) struct UploadObjectsState { // https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603 input: UploadObjectsInput, cancel_tx: Sender, + cancel_rx: Receiver, failed_uploads: Mutex>, successful_uploads: AtomicU64, total_bytes_transferred: AtomicU64, } impl UploadObjectsState { - pub(crate) fn new(input: UploadObjectsInput, cancel_tx: Sender) -> Self { + pub(crate) fn new( + input: UploadObjectsInput, + cancel_tx: Sender, + cancel_rx: Receiver, + ) -> Self { Self { input, cancel_tx, + cancel_rx, failed_uploads: Mutex::new(Vec::new()), successful_uploads: AtomicU64::default(), total_bytes_transferred: AtomicU64::default(), @@ -113,8 +117,9 @@ impl UploadObjectsContext { handle: Arc, input: UploadObjectsInput, cancel_tx: Sender, + cancel_rx: Receiver, ) -> Self { - let state = Arc::new(UploadObjectsState::new(input, cancel_tx)); + let state = Arc::new(UploadObjectsState::new(input, cancel_tx, cancel_rx)); TransferContext { handle, state } } } diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index 444a3a9..4a9033f 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -10,9 +10,9 @@ use std::sync::Arc; use super::{UploadObjectsContext, UploadObjectsInput, UploadObjectsState}; use async_channel::{Receiver, Sender}; +use aws_sdk_s3::error::DisplayErrorContext; use blocking::Unblock; use futures_util::StreamExt; -use tokio::sync::watch; use walkdir::WalkDir; use crate::error::ErrorKind; @@ -40,7 +40,6 @@ impl UploadObjectJob { pub(super) async fn list_directory_contents( state: Arc, list_directory_tx: Sender>, - mut cancel_rx: watch::Receiver, ) -> Result<(), error::Error> { let input = &state.input; @@ -56,6 +55,8 @@ pub(super) async fn list_directory_contents( let default_filter = &UploadFilter::default(); let filter = input.filter().unwrap_or(default_filter); + let mut cancel_rx = state.cancel_rx.clone(); + loop { tokio::select! { _ = cancel_rx.changed() => { @@ -187,8 +188,8 @@ fn derive_object_key<'a>( pub(super) async fn upload_objects( ctx: UploadObjectsContext, list_directory_rx: Receiver>, - mut cancel_rx: watch::Receiver, ) -> Result<(), error::Error> { + let mut cancel_rx = ctx.state.cancel_rx.clone(); loop { tokio::select! { _ = cancel_rx.changed() => { @@ -202,7 +203,7 @@ pub(super) async fn upload_objects( match job { Ok(job) => { let key = job.key.clone(); - let result = upload_single_obj(&ctx, job, cancel_rx.clone()).await; + let result = upload_single_obj(&ctx, job).await; match result { Ok(bytes_transferred) => { ctx.state.successful_uploads.fetch_add(1, Ordering::SeqCst); @@ -236,7 +237,6 @@ pub(super) async fn upload_objects( async fn upload_single_obj( ctx: &UploadObjectsContext, job: UploadObjectJob, - cancel_rx: watch::Receiver, ) -> Result { let UploadObjectJob { object, key } = job; @@ -258,8 +258,18 @@ async fn upload_single_obj( let mut handle = crate::operation::upload::Upload::orchestrate(ctx.handle.clone(), input).await?; - if cancel_rx.has_changed().unwrap() { - let _ = handle.abort().await; + if ctx + .state + .cancel_rx + .has_changed() + .expect("the channel should be open as it is owned by `UploadObjectsState`") + { + if let Err(e) = handle.abort().await { + tracing::error!( + "encountered an error while cancelling a single object upload: {}", + DisplayErrorContext(&e) + ); + } Err(crate::error::Error::new( ErrorKind::OperationCancelled, CANCELLATION_ERROR.to_owned(), @@ -339,6 +349,7 @@ mod tests { use std::error::Error as _; use test_common::create_test_dir; use tokio::fs::symlink; + use tokio::sync::watch; #[test] fn test_derive_object_key() { @@ -404,9 +415,8 @@ mod tests { let (cancel_tx, cancel_rx) = watch::channel(false); let join_handle = tokio::spawn(list_directory_contents( - Arc::new(UploadObjectsState::new(input, cancel_tx)), + Arc::new(UploadObjectsState::new(input, cancel_tx, cancel_rx)), list_directory_tx, - cancel_rx, )); let mut successes = BTreeMap::new(); @@ -710,7 +720,7 @@ mod tests { .bucket(bucket) .build() .unwrap(); - let ctx = UploadObjectsContext::new(handle, input, cancel_tx); + let ctx = UploadObjectsContext::new(handle, input, cancel_tx, cancel_rx); let job = UploadObjectJob { object: InputStream::from(Bytes::from_static(b"doesnotmatter")), key: "doesnotmatter".to_owned(), @@ -718,7 +728,7 @@ mod tests { ctx.state.cancel_tx.send(true).unwrap(); - let err = upload_single_obj(&ctx, job, cancel_rx).await.unwrap_err(); + let err = upload_single_obj(&ctx, job).await.unwrap_err(); assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind()); } From 6c85ac548625fdf3fc3c19fc443a0f745de3d181 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 21 Nov 2024 14:51:20 -0600 Subject: [PATCH 07/11] Add integration test to verify dropping `UploadObjectsHandle` This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1850926620 --- .../tests/upload_objects_test.rs | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/aws-s3-transfer-manager/tests/upload_objects_test.rs b/aws-s3-transfer-manager/tests/upload_objects_test.rs index 6de4328..afdc8f7 100644 --- a/aws-s3-transfer-manager/tests/upload_objects_test.rs +++ b/aws-s3-transfer-manager/tests/upload_objects_test.rs @@ -509,3 +509,55 @@ async fn test_failed_child_operation_should_cause_ongoing_requests_to_be_cancell // the execution should see at least one cancellation signal being delivered. assert!(rx.contents().contains("received cancellation signal")); } + +#[tokio::test] +async fn test_drop_upload_objects_handle() { + let test_dir = create_test_dir( + Some("test"), + vec![ + ("sample.jpg", 1), + ("photos/2022-January/sample.jpg", 1), + ("photos/2022-February/sample1.jpg", 1), + ("photos/2022-February/sample2.jpg", 1), + ("photos/2022-February/sample3.jpg", 1), + ], + &[], + ); + + let (watch_tx, watch_rx) = watch::channel(()); + + let bucket_name = "test-bucket"; + let put_object = mock!(aws_sdk_s3::Client::put_object) + .match_requests(move |input| input.bucket() == Some(bucket_name)) + .then_output({ + move || { + watch_tx.send(()).unwrap(); + // sleep for some time so that the main thread proceeds with `drop(handle)` + std::thread::sleep(std::time::Duration::from_millis(100)); + PutObjectOutput::builder().build() + } + }); + let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); + let config = aws_s3_transfer_manager::Config::builder() + .client(s3_client) + .build(); + let sut = aws_s3_transfer_manager::Client::new(config); + + let handle = sut + .upload_objects() + .bucket(bucket_name) + .source(test_dir.path()) + .recursive(true) + .send() + .await + .unwrap(); + + // Wait until execution reaches the point just before returning `PutObjectOutput`, + // as dropping `handle` immediately after creation may not be interesting for testing. + while !watch_rx.has_changed().unwrap() { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // should not panic + drop(handle) +} From a6fbdcf0e7b9acf02405dcd1f56885f4745c6cb3 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 21 Nov 2024 14:55:01 -0600 Subject: [PATCH 08/11] Remove unnecessary doc sentence from `OperationCancelled` This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1850936487 --- aws-s3-transfer-manager/src/error.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index ab38b23..7931655 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -46,8 +46,7 @@ pub enum ErrorKind { ChildOperationFailed, /// The operation is being canceled because the user explicitly called `.abort` on the handle, - /// or a child operation failed with the abort policy. The operation is now transitioning into - /// a graceful shutdown mode. + /// or a child operation failed with the abort policy. OperationCancelled, } From e655aeb95ee251f017b0f38d1cc00c88dad1d47a Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 22 Nov 2024 15:05:56 -0600 Subject: [PATCH 09/11] Add comment on why we check if cancel receiver has been updated This commit responds to https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1850934055 --- aws-s3-transfer-manager/src/operation/upload_objects/worker.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index 4a9033f..56b05e2 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -258,6 +258,9 @@ async fn upload_single_obj( let mut handle = crate::operation::upload::Upload::orchestrate(ctx.handle.clone(), input).await?; + // The cancellation process would work fine without this if statement. + // It's here so we can save a single upload operation that would otherwise + // be wasted if the system is already in graceful shutdown mode. if ctx .state .cancel_rx From 6244d9e9f95b1d50604e4d1a765f3fb3d60e0313 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 22 Nov 2024 15:17:11 -0600 Subject: [PATCH 10/11] Create cancellation channel internlly in `UploadObjectsContext` This commit addresses https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/75#discussion_r1853953053 --- .../src/operation/upload_objects.rs | 11 +++-------- .../src/operation/upload_objects/worker.rs | 4 +--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/aws-s3-transfer-manager/src/operation/upload_objects.rs b/aws-s3-transfer-manager/src/operation/upload_objects.rs index 39d6ca9..60fb3ff 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects.rs @@ -55,9 +55,8 @@ impl UploadObjects { return Err(e); } }; - let (cancel_tx, cancel_rx) = watch::channel(false); let concurrency = handle.num_workers(); - let ctx = UploadObjectsContext::new(handle.clone(), input, cancel_tx, cancel_rx); + let ctx = UploadObjectsContext::new(handle.clone(), input); // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. let mut tasks = JoinSet::new(); @@ -113,12 +112,8 @@ impl UploadObjectsState { type UploadObjectsContext = TransferContext; impl UploadObjectsContext { - fn new( - handle: Arc, - input: UploadObjectsInput, - cancel_tx: Sender, - cancel_rx: Receiver, - ) -> Self { + fn new(handle: Arc, input: UploadObjectsInput) -> Self { + let (cancel_tx, cancel_rx) = watch::channel(false); let state = Arc::new(UploadObjectsState::new(input, cancel_tx, cancel_rx)); TransferContext { handle, state } } diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index 56b05e2..fad3ae8 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -331,7 +331,6 @@ mod tests { use aws_sdk_s3::operation::put_object::PutObjectOutput; use aws_smithy_mocks_experimental::{mock, mock_client, RuleMode}; use bytes::Bytes; - use tokio::sync::watch; use crate::{ client::Handle, @@ -714,7 +713,6 @@ mod tests { let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]); let config = crate::Config::builder().client(s3_client).build(); - let (cancel_tx, cancel_rx) = watch::channel(false); let scheduler = Scheduler::new(DEFAULT_CONCURRENCY); let handle = std::sync::Arc::new(Handle { config, scheduler }); @@ -723,7 +721,7 @@ mod tests { .bucket(bucket) .build() .unwrap(); - let ctx = UploadObjectsContext::new(handle, input, cancel_tx, cancel_rx); + let ctx = UploadObjectsContext::new(handle, input); let job = UploadObjectJob { object: InputStream::from(Bytes::from_static(b"doesnotmatter")), key: "doesnotmatter".to_owned(), From bee91195fa886675120e1e73a0f5a13d5abd003d Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 22 Nov 2024 16:14:21 -0600 Subject: [PATCH 11/11] Make one of uploading multiple objects integ tests more reliable --- .../src/operation/upload_objects/worker.rs | 1 + aws-s3-transfer-manager/tests/upload_objects_test.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs index fad3ae8..dc86ba3 100644 --- a/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/upload_objects/worker.rs @@ -231,6 +231,7 @@ pub(super) async fn upload_objects( } } + tracing::trace!("ls channel closed, worker finished"); Ok(()) } diff --git a/aws-s3-transfer-manager/tests/upload_objects_test.rs b/aws-s3-transfer-manager/tests/upload_objects_test.rs index afdc8f7..dea17d5 100644 --- a/aws-s3-transfer-manager/tests/upload_objects_test.rs +++ b/aws-s3-transfer-manager/tests/upload_objects_test.rs @@ -506,8 +506,12 @@ async fn test_failed_child_operation_should_cause_ongoing_requests_to_be_cancell &ErrorKind::ChildOperationFailed, handle.join().await.unwrap_err().kind() ); - // the execution should see at least one cancellation signal being delivered. - assert!(rx.contents().contains("received cancellation signal")); + // The execution should either receive at least one cancellation signal or successfully complete the current task. + let logs = rx.contents(); + assert!( + logs.contains("received cancellation signal") + || logs.contains("ls channel closed, worker finished") + ); } #[tokio::test]