Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort Download Object #80

Merged
merged 38 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2cd62a2
wip get rid of join and add abort function
waahm7 Nov 27, 2024
14a9130
wip watch channel
waahm7 Nov 27, 2024
daf50ec
Support cancelling multiple downloads
ysaito1001 Nov 27, 2024
7956233
Merge remote-tracking branch 'origin/ysaito/cancel-multi-downloads' i…
waahm7 Nov 27, 2024
0b9b305
wip handle discovery errors
waahm7 Nov 27, 2024
8226092
rename -> body to output
waahm7 Nov 27, 2024
2b36ded
fix cp
waahm7 Nov 27, 2024
21b7d2d
use common types
waahm7 Nov 29, 2024
fd470f8
update exisitng test
waahm7 Nov 29, 2024
bf879f1
update drain
waahm7 Nov 29, 2024
a44575b
more body->output renames
waahm7 Dec 2, 2024
31b6241
log error
waahm7 Dec 2, 2024
c570114
debug log
waahm7 Dec 2, 2024
001ff19
fmt
waahm7 Dec 2, 2024
51493c2
fix todos
waahm7 Dec 2, 2024
d20ec39
fmt
waahm7 Dec 2, 2024
0839743
get rid of todo
waahm7 Dec 2, 2024
c8ffadb
no need for box error
waahm7 Dec 2, 2024
c25ba12
Merge branch 'main' into cancel-single-download
waahm7 Dec 3, 2024
ebcc96d
debug
waahm7 Dec 3, 2024
d954fbc
add test
waahm7 Dec 4, 2024
80e587e
fmt
waahm7 Dec 4, 2024
0462715
fix todo
waahm7 Dec 4, 2024
05c2504
handle abort
waahm7 Dec 4, 2024
4e55abd
rename output to DownloadOutput and pub use
waahm7 Dec 5, 2024
227180a
change logs
waahm7 Dec 5, 2024
f0e7d2b
fix typo
waahm7 Dec 5, 2024
aba4be5
Update aws-s3-transfer-manager/src/operation/download/service.rs
waahm7 Dec 6, 2024
a0da2b4
fmt
waahm7 Dec 6, 2024
79c660b
ignore OperationCancelled errors
waahm7 Dec 6, 2024
ece39ae
revert name to body
waahm7 Dec 6, 2024
471d7bf
mising reverts
waahm7 Dec 6, 2024
26f3d81
more
waahm7 Dec 6, 2024
63a2158
more
waahm7 Dec 6, 2024
93d58e8
fmt
waahm7 Dec 6, 2024
34f994f
Update aws-s3-transfer-manager/src/operation/download/handle.rs
waahm7 Dec 9, 2024
358c72d
update doc
waahm7 Dec 9, 2024
1420ec7
more parts
waahm7 Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::metrics::unit::ByteUnit;
use aws_s3_transfer_manager::metrics::Throughput;
use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::operation::download::output::Output;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::error::DisplayErrorContext;
use bytes::Buf;
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn do_download(args: Args) -> Result<(), BoxError> {
// TM will handle it's own thread pool for filesystem work
let mut handle = tm.download().bucket(bucket).key(key).initiate()?;

write_body(handle.body_mut(), dest)
write_body(handle.ouput_mut(), dest)
.instrument(tracing::debug_span!("write-output"))
.await?;

Expand Down Expand Up @@ -296,8 +296,8 @@ async fn main() -> Result<(), BoxError> {
Ok(())
}

async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
async fn write_body(output: &mut Output, mut dest: fs::File) -> Result<(), BoxError> {
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
while let Some(chunk) = output.next().await {
let chunk = chunk.unwrap().data;
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
let mut segment_cnt = 1;
Expand Down
56 changes: 42 additions & 14 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use aws_sdk_s3::error::DisplayErrorContext;
/// Request type for dowloading a single object from Amazon S3
pub use input::{DownloadInput, DownloadInputBuilder};

/// Abstractions for response bodies and consuming data streams.
pub mod body;
/// Operation builders
pub mod builders;
/// Abstractions for responses and consuming data streams.
pub mod output;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: Be consistent across operations. e.g. upload output module is not public but it re-exports the output type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.


mod discovery;

Expand All @@ -33,15 +33,15 @@ use crate::error;
use crate::io::AggregatedBytes;
use crate::runtime::scheduler::OwnedWorkPermit;
use aws_smithy_types::byte_stream::ByteStream;
use body::{Body, ChunkOutput};
use discovery::discover_obj;
use output::{ChunkOutput, Output};
use service::distribute_work;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell};
use tokio::sync::{mpsc, oneshot, watch, Mutex, OnceCell};
use tokio::task::{self, JoinSet};

use super::TransferContext;
use super::{CancelNotificationReceiver, CancelNotificationSender, TransferContext};

/// Operation struct for single object download
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Download {
));

Ok(DownloadHandle {
body: Body::new(comp_rx),
output: Output::new(comp_rx),
tasks,
discovery,
object_meta_receiver: Mutex::new(Some(object_meta_rx)),
Expand All @@ -101,7 +101,7 @@ async fn send_discovery(
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) -> Result<(), crate::error::Error> {
) {
// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
Expand All @@ -115,13 +115,37 @@ async fn send_discovery(
}

// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;
let permit = ctx.handle.scheduler.acquire_permit().await;
let permit = match permit {
Ok(permit) => permit,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({}) has been dropped, aborting during the discovery phase", input.key.unwrap_or("None".to_string()));
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
}
return;
}
};

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
// FIXME - This will fail if the handle is dropped at this point. We should handle
// the cancellation gracefully here.
let _ = object_meta_tx.send(discovery.object_meta);
let discovery = discover_obj(&ctx, &input).await;
let mut discovery = match discovery {
Ok(discovery) => discovery,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({}) has been dropped, aborting during the discovery phase", input.key.unwrap_or("None".to_string()));
}
return;
}
};

if object_meta_tx.send(discovery.object_meta).is_err() {
tracing::debug!(
"Download handle for key({}) has been dropped, aborting during the discovery phase",
input.key.unwrap_or("None".to_string())
);
return;
}

let initial_chunk = discovery.initial_chunk.take();

let mut tasks = tasks.lock().await;
Expand All @@ -148,14 +172,13 @@ async fn send_discovery(
parent_span_for_tasks,
);
}
Ok(())
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
/// the starting sequence number to use for remaining chunks.
///
/// NOTE: This function does _not_ wait to read the initial chunk from discovery but
/// instead spawns a new task to read the stream and send it over the body channel.
/// instead spawns a new task to read the stream and send it over the output channel.
/// This allows remaining work to start immediately (and concurrently) without
/// waiting for the first chunk.
fn handle_discovery_chunk(
Expand Down Expand Up @@ -200,14 +223,19 @@ fn handle_discovery_chunk(
#[derive(Debug)]
pub(crate) struct DownloadState {
current_seq: AtomicU64,
cancel_tx: CancelNotificationSender,
cancel_rx: CancelNotificationReceiver,
}

type DownloadContext = TransferContext<DownloadState>;

impl DownloadContext {
fn new(handle: Arc<crate::client::Handle>) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(DownloadState {
current_seq: AtomicU64::new(0),
cancel_tx,
cancel_rx,
});
TransferContext { handle, state }
}
Expand Down
37 changes: 17 additions & 20 deletions aws-s3-transfer-manager/src/operation/download/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::operation::download::body::Body;
use crate::operation::download::output::Output;

use super::object_meta::ObjectMetadata;

Expand All @@ -23,11 +23,11 @@ pub struct DownloadHandle {
/// Object metadata.
pub(crate) object_meta: OnceCell<ObjectMetadata>,

/// The object content
pub(crate) body: Body,
/// The object content and metadata
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) output: Output,

/// Discovery task
pub(crate) discovery: task::JoinHandle<Result<(), error::Error>>,
pub(crate) discovery: task::JoinHandle<()>,

/// All child tasks (ranged GetObject) spawned for this download
pub(crate) tasks: Arc<Mutex<task::JoinSet<()>>>,
Expand All @@ -53,29 +53,26 @@ impl DownloadHandle {
Ok(meta)
}

/// Object content
pub fn body(&self) -> &Body {
&self.body
/// Object content and metadata
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
pub fn output(&self) -> &Output {
&self.output
}

/// Mutable reference to the body
pub fn body_mut(&mut self) -> &mut Body {
&mut self.body
/// Mutable reference to the output
pub fn ouput_mut(&mut self) -> &mut Output {
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
&mut self.output
}

/// Consume the handle and wait for download transfer to complete
#[tracing::instrument(skip_all, level = "debug", name = "join-download")]
pub async fn join(mut self) -> Result<(), crate::error::Error> {
self.body.close();

self.discovery.await??;
/// Abort the download and cancel any in-progress work.
pub async fn abort(mut self) {
self.output.close();
self.discovery.abort();
let _ = self.discovery.await;
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
// It's safe to grab the lock here because discovery is already complete, and we will never
// lock tasks again after discovery to spawn more tasks.
let mut tasks = self.tasks.lock().await;
while let Some(join_result) = tasks.join_next().await {
join_result?;
}
Ok(())
tasks.abort_all();
while (tasks.join_next().await).is_some() {}
}
}

Expand Down
Loading