Skip to content

Commit

Permalink
Expose Download Metadata for Every Chunk (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Nov 25, 2024
1 parent c427a5e commit 06c087a
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 274 deletions.
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
blocking = "1.6.0"
bytes = "1"
bytes-utils = "0.1.4"
futures-util = "0.3.30"
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
Expand Down
6 changes: 3 additions & 3 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ async fn do_download(args: Args) -> Result<(), BoxError> {
// TODO(aws-sdk-rust#1159) - rewrite this less naively,
// likely abstract this into performant utils for single file download. Higher level
// TM will handle it's own thread pool for filesystem work
let mut handle = tm.download().bucket(bucket).key(key).send().await?;
let mut handle = tm.download().bucket(bucket).key(key).initiate()?;

write_body(handle.body_mut(), dest)
.instrument(tracing::debug_span!("write-output"))
.await?;

let elapsed = start.elapsed();
let obj_size_bytes = handle.object_meta.total_size();
let obj_size_bytes = handle.object_meta().await?.content_length();
let throughput = Throughput::new(obj_size_bytes, elapsed);

println!(
Expand Down Expand Up @@ -298,7 +298,7 @@ async fn main() -> Result<(), BoxError> {

async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
let chunk = chunk.unwrap().data;
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
let mut segment_cnt = 1;
for segment in chunk.into_segments() {
Expand Down
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ allowed_external_types = [
"tokio::runtime::task::error::JoinError",
"tokio::io::async_read::AsyncRead",
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",
"aws_types::request_id::RequestId",
"aws_types::request_id::RequestIdExt"
]
3 changes: 1 addition & 2 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ impl Client {
/// .download()
/// .bucket("my-bucket")
/// .key("my-key")
/// .send()
/// .await?;
/// .initiate()?;
///
/// // process data off handle...
///
Expand Down
82 changes: 82 additions & 0 deletions aws-s3-transfer-manager/src/io/aggregated_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::io::IoSlice;

use aws_sdk_s3::primitives::ByteStream;
use bytes::{Buf, Bytes};
use bytes_utils::SegmentedBuf;

use crate::error::ErrorKind;

/// Non-contiguous Binary Data Storage
///
/// When data is read from the network, it is read in a sequence of chunks that are not in
/// contiguous memory. [`AggregatedBytes`] provides a view of
/// this data via [`impl Buf`](bytes::Buf) or it can be copied into contiguous storage with
/// [`.into_bytes()`](crate::io::aggregated_bytes::AggregatedBytes::into_bytes).
#[derive(Debug, Clone)]
pub struct AggregatedBytes(pub(crate) SegmentedBuf<Bytes>);

impl AggregatedBytes {
/// Convert this buffer into [`Bytes`].
///
/// # Why does this consume `self`?
/// Technically, [`copy_to_bytes`](bytes::Buf::copy_to_bytes) can be called without ownership of self. However, since this
/// mutates the underlying buffer such that no data is remaining, it is more misuse resistant to
/// prevent the caller from attempting to reread the buffer.
///
/// If the caller only holds a mutable reference, they may use [`copy_to_bytes`](bytes::Buf::copy_to_bytes)
/// directly on `AggregatedBytes`.
pub fn into_bytes(mut self) -> Bytes {
self.0.copy_to_bytes(self.0.remaining())
}

/// Convert this buffer into an [`Iterator`] of underlying non-contiguous segments of [`Bytes`]
pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
self.0.into_inner().into_iter()
}

/// Convert this buffer into a `Vec<u8>`
pub fn to_vec(self) -> Vec<u8> {
self.0.into_inner().into_iter().flatten().collect()
}

/// Make this buffer from a ByteStream
pub(crate) async fn from_byte_stream(value: ByteStream) -> Result<Self, crate::error::Error> {
let mut value = value;
let mut output = SegmentedBuf::new();
while let Some(buf) = value.next().await {
match buf {
Ok(buf) => output.push(buf),
Err(err) => return Err(crate::error::from_kind(ErrorKind::ChunkFailed)(err)),
};
}
Ok(AggregatedBytes(output))
}
}

impl Buf for AggregatedBytes {
// Forward all methods that SegmentedBuf has custom implementations of.
fn remaining(&self) -> usize {
self.0.remaining()
}

fn chunk(&self) -> &[u8] {
self.0.chunk()
}

fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
self.0.chunks_vectored(dst)
}

fn advance(&mut self, cnt: usize) {
self.0.advance(cnt)
}

fn copy_to_bytes(&mut self, len: usize) -> Bytes {
self.0.copy_to_bytes(len)
}
}
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

/// Adapters for other IO library traits to map to `InputStream`
pub mod adapters;
/// Download Body Type
mod aggregated_bytes;
mod buffer;
pub(crate) mod part_reader;
mod path_body;
Expand All @@ -15,6 +17,7 @@ pub mod error;
mod size_hint;

// re-exports
pub use self::aggregated_bytes::AggregatedBytes;
pub(crate) use self::buffer::Buffer;
pub use self::path_body::PathBodyBuilder;
pub use self::size_hint::SizeHint;
Expand Down
167 changes: 99 additions & 68 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,26 @@ mod handle;
pub use handle::DownloadHandle;
use tracing::Instrument;

/// Provides metadata for each chunk during an object download.
mod chunk_meta;
pub use chunk_meta::ChunkMetadata;
/// Provides metadata for a single S3 object during download.
mod object_meta;
pub use object_meta::ObjectMetadata;

mod service;

use crate::error;
use crate::io::AggregatedBytes;
use crate::runtime::scheduler::OwnedWorkPermit;
use aws_smithy_types::byte_stream::ByteStream;
use body::Body;
use body::{Body, ChunkOutput};
use discovery::discover_obj;
use service::{distribute_work, ChunkResponse};
use service::distribute_work;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell};
use tokio::task::{self, JoinSet};

use super::TransferContext;

Expand All @@ -50,9 +57,9 @@ impl Download {
///
/// If `use_current_span_as_parent_for_tasks` is false, spawned tasks will
/// "follow from" the current span, but be under their own root of the trace tree.
/// Use this for `TransferManager.download().send()`, where the spawned tasks
/// should NOT extend the life of the current `send()` span.
pub(crate) async fn orchestrate(
/// Use this for `TransferManager.download().initiate()`, where the spawned tasks
/// should NOT extend the life of the current `initiate()` span.
pub(crate) fn orchestrate(
handle: Arc<crate::client::Handle>,
input: crate::operation::download::DownloadInput,
use_current_span_as_parent_for_tasks: bool,
Expand All @@ -62,64 +69,86 @@ impl Download {
todo!("single part download not implemented")
}

let concurrency = handle.num_workers();
let ctx = DownloadContext::new(handle);

// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
"download-tasks",
bucket = input.bucket().unwrap_or_default(),
key = input.key().unwrap_or_default(),
);
if !use_current_span_as_parent_for_tasks {
// if not child of current span, then "follows from" current span
parent_span_for_tasks.follows_from(tracing::Span::current());
}

// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
let concurrency = ctx.handle.num_workers();
let (comp_tx, comp_rx) = mpsc::channel(concurrency);

let initial_chunk = discovery.initial_chunk.take();

let mut handle = DownloadHandle {
// FIXME(aws-sdk-rust#1159) - initial object discovery for a range/first-part will not
// have the correct metadata w.r.t. content-length and maybe others for the whole object.
object_meta: discovery.meta,
let (object_meta_tx, object_meta_rx) = oneshot::channel();

let tasks = Arc::new(Mutex::new(JoinSet::new()));
let discovery = tokio::spawn(send_discovery(
tasks.clone(),
ctx.clone(),
comp_tx,
object_meta_tx,
input,
use_current_span_as_parent_for_tasks,
));

Ok(DownloadHandle {
body: Body::new(comp_rx),
// spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled.
tasks: JoinSet::new(),
ctx,
};

// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so
// that we can begin concurrently downloading any reamining chunks/parts ASAP
let start_seq = handle_discovery_chunk(
&mut handle,
initial_chunk,
&comp_tx,
permit,
parent_span_for_tasks.clone(),
);
tasks,
discovery,
object_meta_receiver: Mutex::new(Some(object_meta_rx)),
object_meta: OnceCell::new(),
})
}
}

if !discovery.remaining.is_empty() {
let remaining = discovery.remaining.clone();
distribute_work(
&mut handle,
remaining,
input,
start_seq,
comp_tx,
parent_span_for_tasks,
)
}
async fn send_discovery(
tasks: Arc<Mutex<task::JoinSet<()>>>,
ctx: DownloadContext,
comp_tx: mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) -> Result<(), crate::error::Error> {
// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
"download-tasks",
bucket = input.bucket().unwrap_or_default(),
key = input.key().unwrap_or_default(),
);
if !use_current_span_as_parent_for_tasks {
// if not child of current span, then "follows from" current span
parent_span_for_tasks.follows_from(tracing::Span::current());
}

Ok(handle)
// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
// FIXME - This will fail if the handle is dropped at this point. We should handle
// the cancellation gracefully here.
let _ = object_meta_tx.send(discovery.object_meta);
let initial_chunk = discovery.initial_chunk.take();

let mut tasks = tasks.lock().await;
// spawn a task (if necessary) to handle the discovery chunk. This returns immediately so
// that we can begin concurrently downloading any remaining chunks/parts ASAP
let start_seq = handle_discovery_chunk(
&mut tasks,
ctx.clone(),
initial_chunk,
&comp_tx,
permit,
parent_span_for_tasks.clone(),
discovery.chunk_meta,
);

if !discovery.remaining.is_empty() {
distribute_work(
&mut tasks,
ctx.clone(),
discovery.remaining,
input,
start_seq,
comp_tx,
parent_span_for_tasks,
);
}
Ok(())
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
Expand All @@ -130,24 +159,26 @@ impl Download {
/// This allows remaining work to start immediately (and concurrently) without
/// waiting for the first chunk.
fn handle_discovery_chunk(
handle: &mut DownloadHandle,
tasks: &mut task::JoinSet<()>,
ctx: DownloadContext,
initial_chunk: Option<ByteStream>,
completed: &mpsc::Sender<Result<ChunkResponse, crate::error::Error>>,
completed: &mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
permit: OwnedWorkPermit,
parent_span_for_tasks: tracing::Span,
metadata: Option<ChunkMetadata>,
) -> u64 {
if let Some(stream) = initial_chunk {
let seq = handle.ctx.next_seq();
let seq = ctx.next_seq();
let completed = completed.clone();
// spawn a task to actually read the discovery chunk without waiting for it so we
// can get started sooner on any remaining work (if any)
handle.tasks.spawn(async move {
let chunk = stream
.collect()
tasks.spawn(async move {
let chunk = AggregatedBytes::from_byte_stream(stream)
.await
.map(|aggregated| ChunkResponse {
.map(|aggregated| ChunkOutput {
seq,
data: Some(aggregated),
data: aggregated,
metadata: metadata.expect("chunk metadata is available"),
})
.map_err(error::discovery_failed);

Expand All @@ -162,7 +193,7 @@ fn handle_discovery_chunk(
}
}.instrument(tracing::debug_span!(parent: parent_span_for_tasks.clone(), "collect-body-from-discovery", seq)));
}
handle.ctx.current_seq()
ctx.current_seq()
}

/// Download operation specific state
Expand Down
Loading

0 comments on commit 06c087a

Please sign in to comment.