Skip to content

Commit

Permalink
Add mechanism to cancel the operation of uploading directory (#75)
Browse files Browse the repository at this point in the history
* Add ability to cancel upload directory operation

* Add unit test to verify cancellation of a single upload

* Add integ tests to verify cancellation of multiple uploads

* Run `cargo fmt` to fix formatting

* Make integration test behavior more deterministic

* Store `Receiver` in the state instead of passing it around

This commit addresses #75 (comment)

* Add integration test to verify dropping `UploadObjectsHandle`

This commit addresses #75 (comment)

* Remove unnecessary doc sentence from `OperationCancelled`

This commit addresses #75 (comment)

* Add comment on why we check if cancel receiver has been updated

This commit responds to #75 (comment)

* Create cancellation channel internlly in `UploadObjectsContext`

This commit addresses #75 (comment)

* Make one of uploading multiple objects integ tests more reliable
  • Loading branch information
ysaito1001 authored Nov 22, 2024
1 parent ee9f51b commit c427a5e
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 111 deletions.
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Builder {
self
}

/// Consumes the builder and constructs a [`Config`](crate::config::Config)
/// Consumes the builder and constructs a [`Config`]
pub fn build(self) -> Config {
Config {
multipart_threshold: self.multipart_threshold_part_size,
Expand Down
5 changes: 5 additions & 0 deletions aws-s3-transfer-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub enum ErrorKind {

/// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket)
ChildOperationFailed,

/// The operation is being canceled because the user explicitly called `.abort` on the handle,
/// or a child operation failed with the abort policy.
OperationCancelled,
}

impl Error {
Expand Down Expand Up @@ -75,6 +79,7 @@ impl fmt::Display for Error {
ErrorKind::ChunkFailed => write!(f, "failed to process chunk"),
ErrorKind::NotFound => write!(f, "resource not found"),
ErrorKind::ChildOperationFailed => write!(f, "child operation failed"),
ErrorKind::OperationCancelled => write!(f, "operation cancelled"),
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions aws-s3-transfer-manager/src/operation/upload/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ pub(crate) enum UploadType {
}

/// Response type for a single upload object request.
///
/// # Cancellation
///
/// The operation can be cancelled either by dropping this handle or by calling
/// [`Self::abort`]. In both cases, any ongoing tasks will stop processing future work
/// and will not start processing anything new. However, there are subtle differences in
/// how each method cancels ongoing tasks.
///
/// When the handle is dropped, in-progress tasks are cancelled at their await points,
/// meaning read body tasks may be interrupted mid-processing, or upload parts may be
/// terminated without calling `AbortMultipartUpload` for multipart uploads.
///
/// In contrast, calling [`Self::abort`] attempts to cancel ongoing tasks more explicitly.
/// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload`
/// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload`
/// are logged, but do not affect the overall cancellation flow.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadHandle {
Expand Down
35 changes: 26 additions & 9 deletions aws-s3-transfer-manager/src/operation/upload_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub use handle::UploadObjectsHandle;

mod output;
pub use output::{UploadObjectsOutput, UploadObjectsOutputBuilder};
use tokio::task::JoinSet;
use tokio::{
sync::watch::{self, Receiver, Sender},
task::JoinSet,
};
use tracing::Instrument;

mod worker;
Expand Down Expand Up @@ -52,7 +55,6 @@ impl UploadObjects {
return Err(e);
}
};

let concurrency = handle.num_workers();
let ctx = UploadObjectsContext::new(handle.clone(), input);

Expand All @@ -62,7 +64,7 @@ impl UploadObjects {

// spawn worker to discover/distribute work
tasks.spawn(worker::list_directory_contents(
ctx.state.input.clone(),
ctx.state.clone(),
list_directory_tx,
));

Expand All @@ -83,21 +85,36 @@ pub(crate) struct UploadObjectsState {
// TODO - Determine if `input` should be separated from this struct
// https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603
input: UploadObjectsInput,
cancel_tx: Sender<bool>,
cancel_rx: Receiver<bool>,
failed_uploads: Mutex<Vec<FailedUpload>>,
successful_uploads: AtomicU64,
total_bytes_transferred: AtomicU64,
}

type UploadObjectsContext = TransferContext<UploadObjectsState>;

impl UploadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: UploadObjectsInput) -> Self {
let state = Arc::new(UploadObjectsState {
impl UploadObjectsState {
pub(crate) fn new(
input: UploadObjectsInput,
cancel_tx: Sender<bool>,
cancel_rx: Receiver<bool>,
) -> Self {
Self {
input,
cancel_tx,
cancel_rx,
failed_uploads: Mutex::new(Vec::new()),
successful_uploads: AtomicU64::default(),
total_bytes_transferred: AtomicU64::default(),
});
}
}
}

type UploadObjectsContext = TransferContext<UploadObjectsState>;

impl UploadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: UploadObjectsInput) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(UploadObjectsState::new(input, cancel_tx, cancel_rx));
TransferContext { handle, state }
}
}
78 changes: 75 additions & 3 deletions aws-s3-transfer-manager/src/operation/upload_objects/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,34 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::{error::ErrorKind, types::FailedTransferPolicy};

use super::{UploadObjectsContext, UploadObjectsOutput};
use tokio::task;

/// Handle for `UploadObjects` operation
///
/// # Cancellation
///
/// The operation can be cancelled by either by dropping this handle or by calling
/// [`Self::abort`]. In both cases, any ongoing tasks will ignore future work and
/// will not start processing anything new. However, there are subtle differences
/// in how each cancels ongoing tasks.
///
/// When the handle is dropped, in-progress tasks will be cancelled at the await
/// points where their futures are waiting. This means a particular upload
/// operation may be terminated mid-process, without completing the upload or
/// calling `AbortMultipartUpload` for multipart uploads (if the upload is
/// multipart, as opposed to a simple `PutObject`). In the case of `Drop`,
/// tasks will be forcefully terminated, regardless of the `FailedTransferPolicy`
/// associated with the handle.
///
/// Calling [`Self::abort`], on the other hand, provides more deterministic cancellation
/// behavior. If the `FailedTransferPolicy` for the handle is set to `Abort`, the
/// individual upload task can either complete the current upload operation or call
/// `AbortMultipartUpload` in the case of multipart uploads. Errors encountered during
/// `AbortMultipartUpload` will be logged, but will not affect the program's cancellation
/// control flow.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadObjectsHandle {
Expand All @@ -18,13 +42,61 @@ pub struct UploadObjectsHandle {

impl UploadObjectsHandle {
/// Consume the handle and wait for the upload to complete
///
/// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method
/// will return an error if any of the spawned tasks encounter one. The other tasks will
/// be canceled, but their cancellations will not be reported as errors by this method;
/// they will be logged as errors, instead.
///
/// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the
/// [`UploadObjectsOutput`] will include a detailed breakdown, such as the number of
/// successful uploads and the number of failed ones.
///
// TODO(aws-sdk-rust#1159) - Consider if we want to return failed `AbortMultipartUpload` during cancellation.
#[tracing::instrument(skip_all, level = "debug", name = "join-upload-objects-")]
pub async fn join(mut self) -> Result<UploadObjectsOutput, crate::error::Error> {
// TODO - Consider implementing more sophisticated error handling such as canceling in-progress transfers
let mut first_error_to_report = None;
while let Some(join_result) = self.tasks.join_next().await {
join_result??;
let result = join_result.expect("task completed");
if let Err(e) = result {
match self.ctx.state.input.failure_policy() {
FailedTransferPolicy::Abort
if first_error_to_report.is_none()
&& e.kind() != &ErrorKind::OperationCancelled =>
{
first_error_to_report = Some(e);
}
FailedTransferPolicy::Continue => {
tracing::warn!("encountered but dismissed error when the failure policy is `Continue`: {e}")
}
_ => {}
}
}
}

if let Some(e) = first_error_to_report {
Err(e)
} else {
Ok(UploadObjectsOutput::from(self.ctx.state.as_ref()))
}
}

/// Aborts all tasks owned by the handle.
///
/// Unlike `Drop`, calling `abort` gracefully shuts down any in-progress tasks.
/// Specifically, ongoing upload tasks will be allowed to complete their current work,
/// but any future uploads will be ignored. The task of listing directory contents will
/// stop yielding new directory contents.
pub async fn abort(&mut self) -> Result<(), crate::error::Error> {
if self.ctx.state.input.failure_policy() == &FailedTransferPolicy::Abort {
if self.ctx.state.cancel_tx.send(true).is_err() {
tracing::warn!(
"all receiver ends have been dropped, unable to send a cancellation signal"
);
}
while (self.tasks.join_next().await).is_some() {}
}

Ok(UploadObjectsOutput::from(self.ctx.state.as_ref()))
Ok(())
}
}
Loading

0 comments on commit c427a5e

Please sign in to comment.