Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism to cancel the operation of uploading directory #75

Merged
merged 11 commits into from
Nov 22, 2024
Merged
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Builder {
self
}

/// Consumes the builder and constructs a [`Config`](crate::config::Config)
/// Consumes the builder and constructs a [`Config`]
pub fn build(self) -> Config {
Config {
multipart_threshold: self.multipart_threshold_part_size,
Expand Down
5 changes: 5 additions & 0 deletions aws-s3-transfer-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub enum ErrorKind {

/// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket)
ChildOperationFailed,

/// The operation is being canceled because the user explicitly called `.abort` on the handle,
/// or a child operation failed with the abort policy.
OperationCancelled,
}

impl Error {
Expand Down Expand Up @@ -75,6 +79,7 @@ impl fmt::Display for Error {
ErrorKind::ChunkFailed => write!(f, "failed to process chunk"),
ErrorKind::NotFound => write!(f, "resource not found"),
ErrorKind::ChildOperationFailed => write!(f, "child operation failed"),
ErrorKind::OperationCancelled => write!(f, "operation cancelled"),
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions aws-s3-transfer-manager/src/operation/upload/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ pub(crate) enum UploadType {
}

/// Response type for a single upload object request.
///
/// # Cancellation
///
/// The operation can be cancelled either by dropping this handle or by calling
/// [`Self::abort`]. In both cases, any ongoing tasks will stop processing future work
/// and will not start processing anything new. However, there are subtle differences in
/// how each method cancels ongoing tasks.
///
/// When the handle is dropped, in-progress tasks are cancelled at their await points,
/// meaning read body tasks may be interrupted mid-processing, or upload parts may be
/// terminated without calling `AbortMultipartUpload` for multipart uploads.
///
/// In contrast, calling [`Self::abort`] attempts to cancel ongoing tasks more explicitly.
/// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload`
/// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload`
/// are logged, but do not affect the overall cancellation flow.
Comment on lines +43 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: Worth mentioning that the operation might already be complete, and we won't delete anything?

#[derive(Debug)]
#[non_exhaustive]
pub struct UploadHandle {
Expand Down
35 changes: 26 additions & 9 deletions aws-s3-transfer-manager/src/operation/upload_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub use handle::UploadObjectsHandle;

mod output;
pub use output::{UploadObjectsOutput, UploadObjectsOutputBuilder};
use tokio::task::JoinSet;
use tokio::{
sync::watch::{self, Receiver, Sender},
task::JoinSet,
};
use tracing::Instrument;

mod worker;
Expand Down Expand Up @@ -52,7 +55,6 @@ impl UploadObjects {
return Err(e);
}
};

let concurrency = handle.num_workers();
let ctx = UploadObjectsContext::new(handle.clone(), input);

Expand All @@ -62,7 +64,7 @@ impl UploadObjects {

// spawn worker to discover/distribute work
tasks.spawn(worker::list_directory_contents(
ctx.state.input.clone(),
ctx.state.clone(),
list_directory_tx,
));

Expand All @@ -83,21 +85,36 @@ pub(crate) struct UploadObjectsState {
// TODO - Determine if `input` should be separated from this struct
// https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603
input: UploadObjectsInput,
cancel_tx: Sender<bool>,
cancel_rx: Receiver<bool>,
failed_uploads: Mutex<Vec<FailedUpload>>,
successful_uploads: AtomicU64,
total_bytes_transferred: AtomicU64,
}

type UploadObjectsContext = TransferContext<UploadObjectsState>;

impl UploadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: UploadObjectsInput) -> Self {
let state = Arc::new(UploadObjectsState {
impl UploadObjectsState {
pub(crate) fn new(
input: UploadObjectsInput,
cancel_tx: Sender<bool>,
cancel_rx: Receiver<bool>,
) -> Self {
Self {
input,
cancel_tx,
cancel_rx,
failed_uploads: Mutex::new(Vec::new()),
successful_uploads: AtomicU64::default(),
total_bytes_transferred: AtomicU64::default(),
});
}
}
}

type UploadObjectsContext = TransferContext<UploadObjectsState>;

impl UploadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: UploadObjectsInput) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(UploadObjectsState::new(input, cancel_tx, cancel_rx));
TransferContext { handle, state }
}
}
78 changes: 75 additions & 3 deletions aws-s3-transfer-manager/src/operation/upload_objects/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,34 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::{error::ErrorKind, types::FailedTransferPolicy};

use super::{UploadObjectsContext, UploadObjectsOutput};
use tokio::task;

/// Handle for `UploadObjects` operation
///
/// # Cancellation
///
/// The operation can be cancelled by either by dropping this handle or by calling
/// [`Self::abort`]. In both cases, any ongoing tasks will ignore future work and
/// will not start processing anything new. However, there are subtle differences
/// in how each cancels ongoing tasks.
///
/// When the handle is dropped, in-progress tasks will be cancelled at the await
/// points where their futures are waiting. This means a particular upload
/// operation may be terminated mid-process, without completing the upload or
/// calling `AbortMultipartUpload` for multipart uploads (if the upload is
/// multipart, as opposed to a simple `PutObject`). In the case of `Drop`,
/// tasks will be forcefully terminated, regardless of the `FailedTransferPolicy`
/// associated with the handle.
///
/// Calling [`Self::abort`], on the other hand, provides more deterministic cancellation
/// behavior. If the `FailedTransferPolicy` for the handle is set to `Abort`, the
/// individual upload task can either complete the current upload operation or call
/// `AbortMultipartUpload` in the case of multipart uploads. Errors encountered during
/// `AbortMultipartUpload` will be logged, but will not affect the program's cancellation
/// control flow.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadObjectsHandle {
Expand All @@ -18,13 +42,61 @@ pub struct UploadObjectsHandle {

impl UploadObjectsHandle {
/// Consume the handle and wait for the upload to complete
///
/// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method
/// will return an error if any of the spawned tasks encounter one. The other tasks will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// will return an error if any of the spawned tasks encounter one. The other tasks will
/// will return the first error if any of the spawned tasks encounter one. The other tasks will

/// be canceled, but their cancellations will not be reported as errors by this method;
/// they will be logged as errors, instead.
///
/// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the
/// [`UploadObjectsOutput`] will include a detailed breakdown, such as the number of
/// successful uploads and the number of failed ones.
///
// TODO(aws-sdk-rust#1159) - Consider if we want to return failed `AbortMultipartUpload` during cancellation.
#[tracing::instrument(skip_all, level = "debug", name = "join-upload-objects-")]
pub async fn join(mut self) -> Result<UploadObjectsOutput, crate::error::Error> {
// TODO - Consider implementing more sophisticated error handling such as canceling in-progress transfers
let mut first_error_to_report = None;
while let Some(join_result) = self.tasks.join_next().await {
join_result??;
let result = join_result.expect("task completed");
if let Err(e) = result {
match self.ctx.state.input.failure_policy() {
FailedTransferPolicy::Abort
if first_error_to_report.is_none()
&& e.kind() != &ErrorKind::OperationCancelled =>
{
first_error_to_report = Some(e);
}
FailedTransferPolicy::Continue => {
tracing::warn!("encountered but dismissed error when the failure policy is `Continue`: {e}")
}
_ => {}
}
}
}

if let Some(e) = first_error_to_report {
Err(e)
} else {
Ok(UploadObjectsOutput::from(self.ctx.state.as_ref()))
}
}

/// Aborts all tasks owned by the handle.
///
/// Unlike `Drop`, calling `abort` gracefully shuts down any in-progress tasks.
/// Specifically, ongoing upload tasks will be allowed to complete their current work,
/// but any future uploads will be ignored. The task of listing directory contents will
/// stop yielding new directory contents.
pub async fn abort(&mut self) -> Result<(), crate::error::Error> {
if self.ctx.state.input.failure_policy() == &FailedTransferPolicy::Abort {
if self.ctx.state.cancel_tx.send(true).is_err() {
tracing::warn!(
"all receiver ends have been dropped, unable to send a cancellation signal"
);
}
while (self.tasks.join_next().await).is_some() {}
}

Ok(UploadObjectsOutput::from(self.ctx.state.as_ref()))
Ok(())
}
}
Loading