Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort Download Object #80

Merged
merged 38 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2cd62a2
wip get rid of join and add abort function
waahm7 Nov 27, 2024
14a9130
wip watch channel
waahm7 Nov 27, 2024
daf50ec
Support cancelling multiple downloads
ysaito1001 Nov 27, 2024
7956233
Merge remote-tracking branch 'origin/ysaito/cancel-multi-downloads' i…
waahm7 Nov 27, 2024
0b9b305
wip handle discovery errors
waahm7 Nov 27, 2024
8226092
rename -> body to output
waahm7 Nov 27, 2024
2b36ded
fix cp
waahm7 Nov 27, 2024
21b7d2d
use common types
waahm7 Nov 29, 2024
fd470f8
update exisitng test
waahm7 Nov 29, 2024
bf879f1
update drain
waahm7 Nov 29, 2024
a44575b
more body->output renames
waahm7 Dec 2, 2024
31b6241
log error
waahm7 Dec 2, 2024
c570114
debug log
waahm7 Dec 2, 2024
001ff19
fmt
waahm7 Dec 2, 2024
51493c2
fix todos
waahm7 Dec 2, 2024
d20ec39
fmt
waahm7 Dec 2, 2024
0839743
get rid of todo
waahm7 Dec 2, 2024
c8ffadb
no need for box error
waahm7 Dec 2, 2024
c25ba12
Merge branch 'main' into cancel-single-download
waahm7 Dec 3, 2024
ebcc96d
debug
waahm7 Dec 3, 2024
d954fbc
add test
waahm7 Dec 4, 2024
80e587e
fmt
waahm7 Dec 4, 2024
0462715
fix todo
waahm7 Dec 4, 2024
05c2504
handle abort
waahm7 Dec 4, 2024
4e55abd
rename output to DownloadOutput and pub use
waahm7 Dec 5, 2024
227180a
change logs
waahm7 Dec 5, 2024
f0e7d2b
fix typo
waahm7 Dec 5, 2024
aba4be5
Update aws-s3-transfer-manager/src/operation/download/service.rs
waahm7 Dec 6, 2024
a0da2b4
fmt
waahm7 Dec 6, 2024
79c660b
ignore OperationCancelled errors
waahm7 Dec 6, 2024
ece39ae
revert name to body
waahm7 Dec 6, 2024
471d7bf
mising reverts
waahm7 Dec 6, 2024
26f3d81
more
waahm7 Dec 6, 2024
63a2158
more
waahm7 Dec 6, 2024
93d58e8
fmt
waahm7 Dec 6, 2024
34f994f
Update aws-s3-transfer-manager/src/operation/download/handle.rs
waahm7 Dec 9, 2024
358c72d
update doc
waahm7 Dec 9, 2024
1420ec7
more parts
waahm7 Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::metrics::unit::ByteUnit;
use aws_s3_transfer_manager::metrics::Throughput;
use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::operation::download::Body;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::error::DisplayErrorContext;
use bytes::Buf;
Expand Down
53 changes: 41 additions & 12 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use aws_sdk_s3::error::DisplayErrorContext;
/// Request type for dowloading a single object from Amazon S3
pub use input::{DownloadInput, DownloadInputBuilder};

/// Abstractions for response bodies and consuming data streams.
pub mod body;
/// Operation builders
pub mod builders;

/// Abstractions for responses and consuming data streams.
mod body;
pub use body::{Body, ChunkOutput};

mod discovery;

mod handle;
Expand All @@ -33,15 +35,14 @@ use crate::error;
use crate::io::AggregatedBytes;
use crate::runtime::scheduler::OwnedWorkPermit;
use aws_smithy_types::byte_stream::ByteStream;
use body::{Body, ChunkOutput};
use discovery::discover_obj;
use service::distribute_work;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell};
use tokio::sync::{mpsc, oneshot, watch, Mutex, OnceCell};
use tokio::task::{self, JoinSet};

use super::TransferContext;
use super::{CancelNotificationReceiver, CancelNotificationSender, TransferContext};

/// Operation struct for single object download
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -101,7 +102,7 @@ async fn send_discovery(
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) -> Result<(), crate::error::Error> {
) {
// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
Expand All @@ -115,13 +116,37 @@ async fn send_discovery(
}

// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;
let permit = ctx.handle.scheduler.acquire_permit().await;
let permit = match permit {
Ok(permit) => permit,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({:?}) has been dropped, aborting during the discovery phase", input.key);
}
return;
}
};

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
// FIXME - This will fail if the handle is dropped at this point. We should handle
// the cancellation gracefully here.
let _ = object_meta_tx.send(discovery.object_meta);
let discovery = discover_obj(&ctx, &input).await;
let mut discovery = match discovery {
Ok(discovery) => discovery,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({:?}) has been dropped, aborting during the discovery phase", input.key);
}
return;
}
};

if object_meta_tx.send(discovery.object_meta).is_err() {
tracing::debug!(
"Download handle for key({:?}) has been dropped, aborting during the discovery phase",
input.key
);
return;
}

let initial_chunk = discovery.initial_chunk.take();

let mut tasks = tasks.lock().await;
Expand All @@ -148,7 +173,6 @@ async fn send_discovery(
parent_span_for_tasks,
);
}
Ok(())
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
Expand Down Expand Up @@ -200,14 +224,19 @@ fn handle_discovery_chunk(
#[derive(Debug)]
pub(crate) struct DownloadState {
current_seq: AtomicU64,
cancel_tx: CancelNotificationSender,
cancel_rx: CancelNotificationReceiver,
}

type DownloadContext = TransferContext<DownloadState>;

impl DownloadContext {
fn new(handle: Arc<crate::client::Handle>) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(DownloadState {
current_seq: AtomicU64::new(0),
cancel_tx,
cancel_rx,
});
TransferContext { handle, state }
}
Expand Down
7 changes: 5 additions & 2 deletions aws-s3-transfer-manager/src/operation/download/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::io::AggregatedBytes;

use super::chunk_meta::ChunkMetadata;

/// Stream of binary data representing an Amazon S3 Object's contents.
/// Stream of [ChunkOutput] representing an Amazon S3 Object's contents and metadata.
///
/// Wraps potentially multiple streams of binary data into a single coherent stream.
/// The data on this stream is sequenced into the correct order.
Expand Down Expand Up @@ -81,7 +81,10 @@ impl Body {
match self.inner.next().await {
None => break,
Some(Ok(chunk)) => self.sequencer.push(chunk),
Some(Err(err)) => return Some(Err(err)),
Some(Err(err)) => {
self.close();
return Some(Err(err));
}
}
}

Expand Down
21 changes: 9 additions & 12 deletions aws-s3-transfer-manager/src/operation/download/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ pub struct DownloadHandle {
/// Object metadata.
pub(crate) object_meta: OnceCell<ObjectMetadata>,

/// The object content
/// The object content, in chunks, and the metadata for each chunk
pub(crate) body: Body,

/// Discovery task
pub(crate) discovery: task::JoinHandle<Result<(), error::Error>>,
pub(crate) discovery: task::JoinHandle<()>,

/// All child tasks (ranged GetObject) spawned for this download
pub(crate) tasks: Arc<Mutex<task::JoinSet<()>>>,
Expand All @@ -53,7 +53,7 @@ impl DownloadHandle {
Ok(meta)
}

/// Object content
/// The object content, in chunks, and the metadata for each chunk
pub fn body(&self) -> &Body {
&self.body
}
Expand All @@ -63,19 +63,16 @@ impl DownloadHandle {
&mut self.body
}

/// Consume the handle and wait for download transfer to complete
#[tracing::instrument(skip_all, level = "debug", name = "join-download")]
pub async fn join(mut self) -> Result<(), crate::error::Error> {
/// Abort the download and cancel any in-progress work.
pub async fn abort(mut self) {
self.body.close();

self.discovery.await??;
self.discovery.abort();
let _ = self.discovery.await;
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
// It's safe to grab the lock here because discovery is already complete, and we will never
// lock tasks again after discovery to spawn more tasks.
let mut tasks = self.tasks.lock().await;
while let Some(join_result) = tasks.join_next().await {
join_result?;
}
Ok(())
tasks.abort_all();
while (tasks.join_next().await).is_some() {}
}
}

Expand Down
67 changes: 47 additions & 20 deletions aws-s3-transfer-manager/src/operation/download/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
use crate::error;
use crate::error::ErrorKind;
use crate::http::header;
use crate::io::AggregatedBytes;
use crate::middleware::limit::concurrency::ConcurrencyLimitLayer;
Expand Down Expand Up @@ -69,25 +70,33 @@ async fn download_specific_chunk(
);

let op = input.into_sdk_operation(ctx.client());
let mut resp = op
.send()
// no instrument() here because parent span shows duration of send + collect
.await
.map_err(error::from_kind(error::ErrorKind::ChunkFailed))?;

let body = mem::replace(&mut resp.body, ByteStream::new(SdkBody::taken()));
let body = AggregatedBytes::from_byte_stream(body)
.instrument(tracing::debug_span!(
"collect-body-from-download-chunk",
seq
))
.await?;

Ok(ChunkOutput {
seq,
data: body,
metadata: resp.into(),
})
let mut cancel_rx = ctx.state.cancel_rx.clone();
tokio::select! {
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
_ = cancel_rx.changed() => {
tracing::debug!("Received cancellating signal, exiting and not downloading chunk#{seq}");
Err(error::operation_cancelled())
},
resp = op.send() => {
match resp {
Err(err) => Err(error::from_kind(error::ErrorKind::ChunkFailed)(err)),
Ok(mut resp) => {
let body = mem::replace(&mut resp.body, ByteStream::new(SdkBody::taken()));
let body = AggregatedBytes::from_byte_stream(body)
.instrument(tracing::debug_span!(
"collect-body-from-download-chunk",
seq
))
.await?;

Ok(ChunkOutput {
seq,
data: body,
metadata: resp.into(),
})
},
}
}
}
}

/// Create a new tower::Service for downloading individual chunks of an object from S3
Expand Down Expand Up @@ -139,12 +148,30 @@ pub(super) fn distribute_work(

let svc = svc.clone();
let comp_tx = comp_tx.clone();
let cancel_tx = ctx.state.cancel_tx.clone();

let task = async move {
// TODO: If downloading a chunk fails, do we want to abort the download?
let resp = svc.oneshot(req).await;
// If any chunk fails, send cancel notification, to kill any other in-flight chunks
if let Err(err) = &resp {
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
if *err.kind() == ErrorKind::OperationCancelled {
// Ignore any OperationCancelled errors.
return;
}
if cancel_tx.send(true).is_err() {
tracing::debug!(
"all receiver ends have dropped, unable to send a cancellation signal"
);
}
}

if let Err(err) = comp_tx.send(resp).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that ErrorKind::OperationCancelledgets sent oncomp_tx`? That could result in MANY chunks sending their cancellation error on the chunk transmitter

Do all of these end up reported to the user via the Body/Output iterator? Or does it filter errors so the user only sees the 1st? Can we guarantee that the ACTUAL error gets reported to the user before any cancellation errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as we encounter an error, we close the channel so user will only see the first error.

Can we guarantee that the ACTUAL error gets reported to the user before any cancellation errors?

Tokio does guarantee that "All data sent on Sender will become available on Receiver in the same order as it was sent." However, if the channel is full, all of them will be waiting at an await point. In that case (not 100% sure), the order won't be guaranteed. We will need a fair bit of complexity to work around that, i.e., maybe collect all the errors and then only send those that are not OperationCancelled errors. Not sure if there is a simpler way to do this. @aajtodd any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to send operation cancelled across the channel at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking there can be a case where only the operation cancelled error is there, but yeah, we can just not send that error at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user wouldn't act on that error right? I wouldn't think so anyway, it doesn't tell them anything they don't already know if they cancelled the operation, thus it probably doesn't make sense to yield it to them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have updated it to ignore OperationCancelled errors.

tracing::debug!(error = ?err, "chunk send failed, channel closed");
if cancel_tx.send(true).is_err() {
tracing::debug!(
"all receiver ends have dropped, unable to send a cancellation signal"
);
}
}
};
tasks.spawn(task.instrument(parent_span_for_tasks.clone()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::operation::download::Body;
use async_channel::{Receiver, Sender};
use path_clean::PathClean;
use std::borrow::Cow;
Expand All @@ -12,7 +13,6 @@ use tokio::fs;
use tokio::io::AsyncWriteExt;

use crate::error::{self, ErrorKind};
use crate::operation::download::body::Body;
use crate::operation::download::{DownloadInput, DownloadInputBuilder};
use crate::operation::DEFAULT_DELIMITER;
use crate::types::{DownloadFilter, FailedDownload, FailedTransferPolicy};
Expand Down Expand Up @@ -191,13 +191,8 @@ async fn download_single_obj(
.has_changed()
.expect("the channel should be open as it is owned by `DownloadObjectsState`")
{
/*
* TODO(single download cleanup): Comment in the following lines of code once single download has been cleaned up.
* Note that it may not be called `.abort()` depending on the outcome of the cleanup.
*
* handle.abort().await;
* return Err(error::operation_cancelled());
*/
handle.abort().await;
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
return Err(error::operation_cancelled());
}

let _ = handle.object_meta().await?;
Expand All @@ -214,8 +209,6 @@ async fn download_single_obj(
}
}

handle.join().await?;

Ok(())
}

Expand Down
Loading