-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose Download Metadata for Every Chunk #73
Changes from 24 commits
32740b0
636588a
5de8eaa
0301f54
4ee9731
f606607
c2ab91a
868a45c
c9c8552
5636926
24074d1
e14e80b
e339417
0de1530
e824e23
abafe4e
ea2ce9c
f1cc638
db9bd42
ee59055
49a681a
b3bf149
d386e1d
67f9b53
e92aab6
e812a9d
07d7e9a
e8da53c
8b19402
6315920
43d1b43
aa22e50
0fe2fac
b04c4d4
bb837fc
6481cd9
947501a
60cbcdc
ba32f03
bb43ed2
dcd6013
6ec3a3f
bf99d44
eeaf1c3
49cd55d
566a50e
06fc9cb
0246b9c
6376db1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -20,19 +20,22 @@ mod handle; | |||||
pub use handle::DownloadHandle; | ||||||
use tracing::Instrument; | ||||||
|
||||||
mod chunk_meta; | ||||||
mod object_meta; | ||||||
mod service; | ||||||
|
||||||
use crate::error; | ||||||
use crate::runtime::scheduler::OwnedWorkPermit; | ||||||
use aws_smithy_types::byte_stream::ByteStream; | ||||||
use body::Body; | ||||||
use body::{AggregatedBytes, Body, ChunkResponse}; | ||||||
use chunk_meta::ChunkMetadata; | ||||||
use discovery::discover_obj; | ||||||
use service::{distribute_work, ChunkResponse}; | ||||||
use object_meta::ObjectMetadata; | ||||||
use service::distribute_work; | ||||||
use std::sync::atomic::{AtomicU64, Ordering}; | ||||||
use std::sync::Arc; | ||||||
use tokio::sync::mpsc; | ||||||
use tokio::task::JoinSet; | ||||||
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell}; | ||||||
use tokio::task::{self, JoinSet}; | ||||||
|
||||||
use super::TransferContext; | ||||||
|
||||||
|
@@ -52,7 +55,7 @@ impl Download { | |||||
/// "follow from" the current span, but be under their own root of the trace tree. | ||||||
/// Use this for `TransferManager.download().send()`, where the spawned tasks | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Just for consistency, even if we rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I did update the docs for download but missed it here. I have updated it. |
||||||
/// should NOT extend the life of the current `send()` span. | ||||||
pub(crate) async fn orchestrate( | ||||||
pub(crate) fn orchestrate( | ||||||
handle: Arc<crate::client::Handle>, | ||||||
input: crate::operation::download::DownloadInput, | ||||||
use_current_span_as_parent_for_tasks: bool, | ||||||
|
@@ -62,64 +65,85 @@ impl Download { | |||||
todo!("single part download not implemented") | ||||||
} | ||||||
|
||||||
let concurrency = handle.num_workers(); | ||||||
let ctx = DownloadContext::new(handle); | ||||||
|
||||||
// create span to serve as parent of spawned child tasks. | ||||||
let parent_span_for_tasks = tracing::debug_span!( | ||||||
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } , | ||||||
"download-tasks", | ||||||
bucket = input.bucket().unwrap_or_default(), | ||||||
key = input.key().unwrap_or_default(), | ||||||
); | ||||||
if !use_current_span_as_parent_for_tasks { | ||||||
// if not child of current span, then "follows from" current span | ||||||
parent_span_for_tasks.follows_from(tracing::Span::current()); | ||||||
} | ||||||
|
||||||
// acquire a permit for discovery | ||||||
let permit = ctx.handle.scheduler.acquire_permit().await?; | ||||||
|
||||||
// make initial discovery about the object size, metadata, possibly first chunk | ||||||
let mut discovery = discover_obj(&ctx, &input).await?; | ||||||
let concurrency = ctx.handle.num_workers(); | ||||||
let (comp_tx, comp_rx) = mpsc::channel(concurrency); | ||||||
|
||||||
let initial_chunk = discovery.initial_chunk.take(); | ||||||
|
||||||
let mut handle = DownloadHandle { | ||||||
// FIXME(aws-sdk-rust#1159) - initial object discovery for a range/first-part will not | ||||||
// have the correct metadata w.r.t. content-length and maybe others for the whole object. | ||||||
object_meta: discovery.meta, | ||||||
let (object_meta_tx, object_meta_rx) = oneshot::channel(); | ||||||
|
||||||
let tasks = Arc::new(Mutex::new(JoinSet::new())); | ||||||
let discovery = tokio::spawn(send_discovery( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pretty neat, delegating async-related preparatory steps (like acquiring permit) to a separate task so |
||||||
tasks.clone(), | ||||||
ctx.clone(), | ||||||
comp_tx, | ||||||
object_meta_tx, | ||||||
input, | ||||||
use_current_span_as_parent_for_tasks, | ||||||
)); | ||||||
|
||||||
Ok(DownloadHandle { | ||||||
body: Body::new(comp_rx), | ||||||
// spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. | ||||||
tasks: JoinSet::new(), | ||||||
ctx, | ||||||
}; | ||||||
|
||||||
// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so | ||||||
// that we can begin concurrently downloading any reamining chunks/parts ASAP | ||||||
let start_seq = handle_discovery_chunk( | ||||||
&mut handle, | ||||||
initial_chunk, | ||||||
&comp_tx, | ||||||
permit, | ||||||
parent_span_for_tasks.clone(), | ||||||
); | ||||||
tasks, | ||||||
discovery, | ||||||
object_meta_receiver: Mutex::new(Some(object_meta_rx)), | ||||||
object_meta: OnceCell::new(), | ||||||
}) | ||||||
} | ||||||
} | ||||||
|
||||||
if !discovery.remaining.is_empty() { | ||||||
let remaining = discovery.remaining.clone(); | ||||||
distribute_work( | ||||||
&mut handle, | ||||||
remaining, | ||||||
input, | ||||||
start_seq, | ||||||
comp_tx, | ||||||
parent_span_for_tasks, | ||||||
) | ||||||
} | ||||||
async fn send_discovery( | ||||||
tasks: Arc<Mutex<task::JoinSet<()>>>, | ||||||
ctx: DownloadContext, | ||||||
comp_tx: mpsc::Sender<Result<ChunkResponse, crate::error::Error>>, | ||||||
object_meta_tx: oneshot::Sender<ObjectMetadata>, | ||||||
input: DownloadInput, | ||||||
use_current_span_as_parent_for_tasks: bool, | ||||||
) -> Result<(), crate::error::Error> { | ||||||
// create span to serve as parent of spawned child tasks. | ||||||
let parent_span_for_tasks = tracing::debug_span!( | ||||||
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } , | ||||||
"download-tasks", | ||||||
bucket = input.bucket().unwrap_or_default(), | ||||||
key = input.key().unwrap_or_default(), | ||||||
); | ||||||
if !use_current_span_as_parent_for_tasks { | ||||||
// if not child of current span, then "follows from" current span | ||||||
parent_span_for_tasks.follows_from(tracing::Span::current()); | ||||||
} | ||||||
|
||||||
Ok(handle) | ||||||
// acquire a permit for discovery | ||||||
let permit = ctx.handle.scheduler.acquire_permit().await?; | ||||||
|
||||||
// make initial discovery about the object size, metadata, possibly first chunk | ||||||
let mut discovery = discover_obj(&ctx, &input).await?; | ||||||
let _ = object_meta_tx.send(discovery.object_meta); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we ignoring the result here? If we never expect this to fail it should be used with an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will only fail if we have already dropped the handle at this point. The discovery task is a join handle and would be in detached state. While it’s possible for this to fail, we should not panic on it. We can handle the cancellation gracefully once we implement the cancellation logic for downloads. @ysaito1001, What are your thoughts since you are looking into cancellation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Glad you asked. I'd say don't worry about using |
||||||
|
||||||
let initial_chunk = discovery.initial_chunk.take(); | ||||||
|
||||||
let mut tasks = tasks.lock().await; | ||||||
// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so | ||||||
// that we can begin concurrently downloading any remaining chunks/parts ASAP | ||||||
let start_seq = handle_discovery_chunk( | ||||||
&mut tasks, | ||||||
ctx.clone(), | ||||||
initial_chunk, | ||||||
&comp_tx, | ||||||
permit, | ||||||
parent_span_for_tasks.clone(), | ||||||
discovery.chunk_meta, | ||||||
); | ||||||
|
||||||
if !discovery.remaining.is_empty() { | ||||||
distribute_work( | ||||||
&mut tasks, | ||||||
ctx.clone(), | ||||||
discovery.remaining, | ||||||
input, | ||||||
start_seq, | ||||||
comp_tx, | ||||||
parent_span_for_tasks, | ||||||
); | ||||||
} | ||||||
Ok(()) | ||||||
} | ||||||
|
||||||
/// Handle possibly sending the first chunk of data received through discovery. Returns | ||||||
|
@@ -130,24 +154,26 @@ impl Download { | |||||
/// This allows remaining work to start immediately (and concurrently) without | ||||||
/// waiting for the first chunk. | ||||||
fn handle_discovery_chunk( | ||||||
handle: &mut DownloadHandle, | ||||||
tasks: &mut task::JoinSet<()>, | ||||||
ctx: DownloadContext, | ||||||
initial_chunk: Option<ByteStream>, | ||||||
completed: &mpsc::Sender<Result<ChunkResponse, crate::error::Error>>, | ||||||
permit: OwnedWorkPermit, | ||||||
parent_span_for_tasks: tracing::Span, | ||||||
metadata: Option<ChunkMetadata>, | ||||||
ysaito1001 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) -> u64 { | ||||||
if let Some(stream) = initial_chunk { | ||||||
let seq = handle.ctx.next_seq(); | ||||||
let seq = ctx.next_seq(); | ||||||
let completed = completed.clone(); | ||||||
// spawn a task to actually read the discovery chunk without waiting for it so we | ||||||
// can get started sooner on any remaining work (if any) | ||||||
handle.tasks.spawn(async move { | ||||||
let chunk = stream | ||||||
.collect() | ||||||
tasks.spawn(async move { | ||||||
let chunk = AggregatedBytes::from_byte_stream(stream) | ||||||
.await | ||||||
.map(|aggregated| ChunkResponse { | ||||||
seq, | ||||||
data: Some(aggregated), | ||||||
data: aggregated, | ||||||
metadata: metadata.expect("chunk metadata is available"), | ||||||
}) | ||||||
.map_err(error::discovery_failed); | ||||||
|
||||||
|
@@ -162,7 +188,7 @@ fn handle_discovery_chunk( | |||||
} | ||||||
}.instrument(tracing::debug_span!(parent: parent_span_for_tasks.clone(), "collect-body-from-discovery", seq))); | ||||||
} | ||||||
handle.ctx.current_seq() | ||||||
ctx.current_seq() | ||||||
} | ||||||
|
||||||
/// Download operation specific state | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discuss: If we are changing
send()
to not do any work or be async then perhaps we should change the name (e.g.initiate
). I only bring this up because we mirrored the Rust SDK API a bit here and wondering whether it will cause any confusion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated. I have added a TODO to make it consistent across the board. I will create a follow-up PR to rename upload from send to initiate as well.