Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Download Metadata for Every Chunk #73

Merged
merged 49 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
32740b0
wip expose ChunkResponse
waahm7 Oct 30, 2024
636588a
expose metadata
waahm7 Oct 30, 2024
5de8eaa
wip ChunkResponse
waahm7 Nov 1, 2024
0301f54
renames
waahm7 Nov 5, 2024
4ee9731
wip
waahm7 Nov 6, 2024
f606607
wip refactor
waahm7 Nov 6, 2024
c2ab91a
sucks but works meta_data
waahm7 Nov 7, 2024
868a45c
refactor chunk vs object meta_data
waahm7 Nov 8, 2024
c9c8552
no async send
waahm7 Nov 8, 2024
5636926
No need for oncecell, I have mut reference
waahm7 Nov 8, 2024
24074d1
interior mutability ftw
waahm7 Nov 8, 2024
e14e80b
rename output back to body
waahm7 Nov 8, 2024
e339417
chunk meta can't be constructed from HeadObject
waahm7 Nov 9, 2024
0de1530
fix todos
waahm7 Nov 10, 2024
e824e23
unordered body is useful
waahm7 Nov 10, 2024
abafe4e
fix docs
waahm7 Nov 10, 2024
ea2ce9c
nit
waahm7 Nov 10, 2024
f1cc638
fix test
waahm7 Nov 10, 2024
db9bd42
fix tests
waahm7 Nov 10, 2024
ee59055
cleanup
waahm7 Nov 10, 2024
49a681a
fmt
waahm7 Nov 10, 2024
b3bf149
cleanup
waahm7 Nov 11, 2024
d386e1d
fix meta_receiver bug possibility
waahm7 Nov 11, 2024
67f9b53
fix naming
waahm7 Nov 11, 2024
e92aab6
fix comment
waahm7 Nov 11, 2024
e812a9d
allow buf
waahm7 Nov 11, 2024
07d7e9a
fix type
waahm7 Nov 11, 2024
e8da53c
rename to initiate
waahm7 Nov 19, 2024
8b19402
Move aggregated bytes to io
waahm7 Nov 19, 2024
6315920
traits for request ids
waahm7 Nov 19, 2024
43d1b43
rename total size to content length
waahm7 Nov 19, 2024
aa22e50
Add docs for ObjectMetadata
waahm7 Nov 19, 2024
0fe2fac
Source docs from generated code
waahm7 Nov 19, 2024
b04c4d4
add comments
waahm7 Nov 19, 2024
bb837fc
add todo
waahm7 Nov 19, 2024
6481cd9
fix comments
waahm7 Nov 19, 2024
947501a
fmt
waahm7 Nov 19, 2024
60cbcdc
update comment
waahm7 Nov 19, 2024
ba32f03
add manual debug implementation
waahm7 Nov 20, 2024
bb43ed2
rename ChunkResponse -> ChunkOutput
waahm7 Nov 20, 2024
dcd6013
better docs
waahm7 Nov 20, 2024
6ec3a3f
use pub use for aggregated bytes
waahm7 Nov 25, 2024
bf99d44
update docs
waahm7 Nov 25, 2024
eeaf1c3
Merge branch 'main' into download-metadata
waahm7 Nov 25, 2024
49cd55d
use pub use instead of pub mod
waahm7 Nov 25, 2024
566a50e
fmt
waahm7 Nov 25, 2024
06fc9cb
external types
waahm7 Nov 25, 2024
0246b9c
Add fixme
waahm7 Nov 25, 2024
6376db1
fmt
waahm7 Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
blocking = "1.6.0"
bytes = "1"
bytes-utils = "0.1.4"
futures-util = "0.3.30"
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
Expand Down
6 changes: 3 additions & 3 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ async fn do_download(args: Args) -> Result<(), BoxError> {
// TODO(aws-sdk-rust#1159) - rewrite this less naively,
// likely abstract this into performant utils for single file download. Higher level
// TM will handle it's own thread pool for filesystem work
let mut handle = tm.download().bucket(bucket).key(key).send().await?;
let mut handle = tm.download().bucket(bucket).key(key).initiate()?;

write_body(handle.body_mut(), dest)
.instrument(tracing::debug_span!("write-output"))
.await?;

let elapsed = start.elapsed();
let obj_size_bytes = handle.object_meta.total_size();
let obj_size_bytes = handle.object_meta().await?.content_length();
let throughput = Throughput::new(obj_size_bytes, elapsed);

println!(
Expand Down Expand Up @@ -298,7 +298,7 @@ async fn main() -> Result<(), BoxError> {

async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
let chunk = chunk.unwrap().data;
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
let mut segment_cnt = 1;
for segment in chunk.into_segments() {
Expand Down
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ allowed_external_types = [
"tokio::runtime::task::error::JoinError",
"tokio::io::async_read::AsyncRead",
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",
aajtodd marked this conversation as resolved.
Show resolved Hide resolved
"aws_types::request_id::RequestId",
"aws_types::request_id::RequestIdExt"
]
3 changes: 1 addition & 2 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ impl Client {
/// .download()
/// .bucket("my-bucket")
/// .key("my-key")
/// .send()
/// .await?;
/// .initiate()?;
///
/// // process data off handle...
///
Expand Down
82 changes: 82 additions & 0 deletions aws-s3-transfer-manager/src/io/aggregated_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::io::IoSlice;

use aws_sdk_s3::primitives::ByteStream;
use bytes::{Buf, Bytes};
use bytes_utils::SegmentedBuf;

use crate::error::ErrorKind;

/// Non-contiguous Binary Data Storage
///
/// When data is read from the network, it is read in a sequence of chunks that are not in
/// contiguous memory. [`AggregatedBytes`] provides a view of
/// this data via [`impl Buf`](bytes::Buf) or it can be copied into contiguous storage with
/// [`.into_bytes()`](crate::io::aggregated_bytes::AggregatedBytes::into_bytes).
#[derive(Debug, Clone)]
pub struct AggregatedBytes(pub(crate) SegmentedBuf<Bytes>);

impl AggregatedBytes {
/// Convert this buffer into [`Bytes`].
///
/// # Why does this consume `self`?
/// Technically, [`copy_to_bytes`](bytes::Buf::copy_to_bytes) can be called without ownership of self. However, since this
/// mutates the underlying buffer such that no data is remaining, it is more misuse resistant to
/// prevent the caller from attempting to reread the buffer.
///
/// If the caller only holds a mutable reference, they may use [`copy_to_bytes`](bytes::Buf::copy_to_bytes)
/// directly on `AggregatedBytes`.
pub fn into_bytes(mut self) -> Bytes {
self.0.copy_to_bytes(self.0.remaining())
}

/// Convert this buffer into an [`Iterator`] of underlying non-contiguous segments of [`Bytes`]
pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
self.0.into_inner().into_iter()
}

/// Convert this buffer into a `Vec<u8>`
pub fn to_vec(self) -> Vec<u8> {
self.0.into_inner().into_iter().flatten().collect()
}

/// Make this buffer from a ByteStream
pub(crate) async fn from_byte_stream(value: ByteStream) -> Result<Self, crate::error::Error> {
let mut value = value;
let mut output = SegmentedBuf::new();
while let Some(buf) = value.next().await {
match buf {
Ok(buf) => output.push(buf),
Err(err) => return Err(crate::error::from_kind(ErrorKind::ChunkFailed)(err)),
};
}
Ok(AggregatedBytes(output))
}
}

impl Buf for AggregatedBytes {
// Forward all methods that SegmentedBuf has custom implementations of.
fn remaining(&self) -> usize {
self.0.remaining()
}

fn chunk(&self) -> &[u8] {
self.0.chunk()
}

fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
self.0.chunks_vectored(dst)
}

fn advance(&mut self, cnt: usize) {
self.0.advance(cnt)
}

fn copy_to_bytes(&mut self, len: usize) -> Bytes {
self.0.copy_to_bytes(len)
}
}
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

/// Adapters for other IO library traits to map to `InputStream`
pub mod adapters;
/// Download Body Type
mod aggregated_bytes;
mod buffer;
pub(crate) mod part_reader;
mod path_body;
Expand All @@ -15,6 +17,7 @@ pub mod error;
mod size_hint;

// re-exports
pub use self::aggregated_bytes::AggregatedBytes;
pub(crate) use self::buffer::Buffer;
pub use self::path_body::PathBodyBuilder;
pub use self::size_hint::SizeHint;
Expand Down
167 changes: 99 additions & 68 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,26 @@ mod handle;
pub use handle::DownloadHandle;
use tracing::Instrument;

/// Provides metadata for each chunk during an object download.
mod chunk_meta;
pub use chunk_meta::ChunkMetadata;
/// Provides metadata for a single S3 object during download.
mod object_meta;
pub use object_meta::ObjectMetadata;

mod service;

use crate::error;
use crate::io::AggregatedBytes;
use crate::runtime::scheduler::OwnedWorkPermit;
use aws_smithy_types::byte_stream::ByteStream;
use body::Body;
use body::{Body, ChunkOutput};
use discovery::discover_obj;
use service::{distribute_work, ChunkResponse};
use service::distribute_work;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell};
use tokio::task::{self, JoinSet};

use super::TransferContext;

Expand All @@ -50,9 +57,9 @@ impl Download {
///
/// If `use_current_span_as_parent_for_tasks` is false, spawned tasks will
/// "follow from" the current span, but be under their own root of the trace tree.
/// Use this for `TransferManager.download().send()`, where the spawned tasks
/// should NOT extend the life of the current `send()` span.
pub(crate) async fn orchestrate(
/// Use this for `TransferManager.download().initiate()`, where the spawned tasks
/// should NOT extend the life of the current `initiate()` span.
pub(crate) fn orchestrate(
handle: Arc<crate::client::Handle>,
input: crate::operation::download::DownloadInput,
use_current_span_as_parent_for_tasks: bool,
Expand All @@ -62,64 +69,86 @@ impl Download {
todo!("single part download not implemented")
}

let concurrency = handle.num_workers();
let ctx = DownloadContext::new(handle);

// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
"download-tasks",
bucket = input.bucket().unwrap_or_default(),
key = input.key().unwrap_or_default(),
);
if !use_current_span_as_parent_for_tasks {
// if not child of current span, then "follows from" current span
parent_span_for_tasks.follows_from(tracing::Span::current());
}

// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
let concurrency = ctx.handle.num_workers();
let (comp_tx, comp_rx) = mpsc::channel(concurrency);

let initial_chunk = discovery.initial_chunk.take();

let mut handle = DownloadHandle {
// FIXME(aws-sdk-rust#1159) - initial object discovery for a range/first-part will not
// have the correct metadata w.r.t. content-length and maybe others for the whole object.
object_meta: discovery.meta,
let (object_meta_tx, object_meta_rx) = oneshot::channel();

let tasks = Arc::new(Mutex::new(JoinSet::new()));
let discovery = tokio::spawn(send_discovery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty neat, delegating async-related preparatory steps (like acquiring permit) to a separate task so orchestrate can return early. tasks needs to be in Arc because of that, but feels like that's a necessary change.

tasks.clone(),
ctx.clone(),
comp_tx,
object_meta_tx,
input,
use_current_span_as_parent_for_tasks,
));

Ok(DownloadHandle {
body: Body::new(comp_rx),
// spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled.
tasks: JoinSet::new(),
ctx,
};

// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so
// that we can begin concurrently downloading any reamining chunks/parts ASAP
let start_seq = handle_discovery_chunk(
&mut handle,
initial_chunk,
&comp_tx,
permit,
parent_span_for_tasks.clone(),
);
tasks,
discovery,
object_meta_receiver: Mutex::new(Some(object_meta_rx)),
object_meta: OnceCell::new(),
})
}
}

if !discovery.remaining.is_empty() {
let remaining = discovery.remaining.clone();
distribute_work(
&mut handle,
remaining,
input,
start_seq,
comp_tx,
parent_span_for_tasks,
)
}
async fn send_discovery(
tasks: Arc<Mutex<task::JoinSet<()>>>,
ctx: DownloadContext,
comp_tx: mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) -> Result<(), crate::error::Error> {
// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
"download-tasks",
bucket = input.bucket().unwrap_or_default(),
key = input.key().unwrap_or_default(),
);
if !use_current_span_as_parent_for_tasks {
// if not child of current span, then "follows from" current span
parent_span_for_tasks.follows_from(tracing::Span::current());
}

Ok(handle)
// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
// FIXME - This will fail if the handle is dropped at this point. We should handle
// the cancellation gracefully here.
let _ = object_meta_tx.send(discovery.object_meta);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring the result here? If we never expect this to fail it should be used with an expect so that if we ever do violate the assumptions we're making it fails loudly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only fail if we have already dropped the handle at this point. The discovery task is a join handle and would be in detached state. While it’s possible for this to fail, we should not panic on it. We can handle the cancellation gracefully once we implement the cancellation logic for downloads. @ysaito1001, What are your thoughts since you are looking into cancellation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad you asked. I'd say don't worry about using _ at line 121 in this PR, so no action needed. My next PR of cancelling download(s) should be a good place to handle the error properly.

let initial_chunk = discovery.initial_chunk.take();

let mut tasks = tasks.lock().await;
// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so
// that we can begin concurrently downloading any remaining chunks/parts ASAP
let start_seq = handle_discovery_chunk(
&mut tasks,
ctx.clone(),
initial_chunk,
&comp_tx,
permit,
parent_span_for_tasks.clone(),
discovery.chunk_meta,
);

if !discovery.remaining.is_empty() {
distribute_work(
&mut tasks,
ctx.clone(),
discovery.remaining,
input,
start_seq,
comp_tx,
parent_span_for_tasks,
);
}
Ok(())
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
Expand All @@ -130,24 +159,26 @@ impl Download {
/// This allows remaining work to start immediately (and concurrently) without
/// waiting for the first chunk.
fn handle_discovery_chunk(
handle: &mut DownloadHandle,
tasks: &mut task::JoinSet<()>,
ctx: DownloadContext,
initial_chunk: Option<ByteStream>,
completed: &mpsc::Sender<Result<ChunkResponse, crate::error::Error>>,
completed: &mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
permit: OwnedWorkPermit,
parent_span_for_tasks: tracing::Span,
metadata: Option<ChunkMetadata>,
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
) -> u64 {
if let Some(stream) = initial_chunk {
let seq = handle.ctx.next_seq();
let seq = ctx.next_seq();
let completed = completed.clone();
// spawn a task to actually read the discovery chunk without waiting for it so we
// can get started sooner on any remaining work (if any)
handle.tasks.spawn(async move {
let chunk = stream
.collect()
tasks.spawn(async move {
let chunk = AggregatedBytes::from_byte_stream(stream)
.await
.map(|aggregated| ChunkResponse {
.map(|aggregated| ChunkOutput {
seq,
data: Some(aggregated),
data: aggregated,
metadata: metadata.expect("chunk metadata is available"),
})
.map_err(error::discovery_failed);

Expand All @@ -162,7 +193,7 @@ fn handle_discovery_chunk(
}
}.instrument(tracing::debug_span!(parent: parent_span_for_tasks.clone(), "collect-body-from-discovery", seq)));
}
handle.ctx.current_seq()
ctx.current_seq()
}

/// Download operation specific state
Expand Down
Loading