From 06c087a5d53676bb048f6c512b8eb1fda63f03d5 Mon Sep 17 00:00:00 2001 From: Waqar Ahmed Khan Date: Mon, 25 Nov 2024 15:09:12 -0800 Subject: [PATCH] Expose Download Metadata for Every Chunk (#73) --- aws-s3-transfer-manager/Cargo.toml | 1 + aws-s3-transfer-manager/examples/cp.rs | 6 +- aws-s3-transfer-manager/external-types.toml | 3 + aws-s3-transfer-manager/src/client.rs | 3 +- .../src/io/aggregated_bytes.rs | 82 ++++++ aws-s3-transfer-manager/src/io/mod.rs | 3 + .../src/operation/download.rs | 167 +++++++----- .../src/operation/download/body.rs | 94 ++++--- .../src/operation/download/builders.rs | 11 +- .../src/operation/download/chunk_meta.rs | 228 ++++++++++++++++ .../src/operation/download/discovery.rs | 31 ++- .../src/operation/download/handle.rs | 51 +++- .../src/operation/download/object_meta.rs | 256 +++++++++++------- .../src/operation/download/service.rs | 50 ++-- .../src/operation/download_objects/worker.rs | 5 +- .../src/operation/upload/builders.rs | 1 + .../tests/download_test.rs | 26 +- 17 files changed, 744 insertions(+), 274 deletions(-) create mode 100644 aws-s3-transfer-manager/src/io/aggregated_bytes.rs create mode 100644 aws-s3-transfer-manager/src/operation/download/chunk_meta.rs diff --git a/aws-s3-transfer-manager/Cargo.toml b/aws-s3-transfer-manager/Cargo.toml index b9c58bd..b2d2da4 100644 --- a/aws-s3-transfer-manager/Cargo.toml +++ b/aws-s3-transfer-manager/Cargo.toml @@ -20,6 +20,7 @@ aws-smithy-types = "1.2.6" aws-types = "1.3.3" blocking = "1.6.0" bytes = "1" +bytes-utils = "0.1.4" futures-util = "0.3.30" path-clean = "1.0.1" pin-project-lite = "0.2.14" diff --git a/aws-s3-transfer-manager/examples/cp.rs b/aws-s3-transfer-manager/examples/cp.rs index 781768d..3652ebb 100644 --- a/aws-s3-transfer-manager/examples/cp.rs +++ b/aws-s3-transfer-manager/examples/cp.rs @@ -171,14 +171,14 @@ async fn do_download(args: Args) -> Result<(), BoxError> { // TODO(aws-sdk-rust#1159) - rewrite this less naively, // likely abstract this into performant utils for single file download. Higher level // TM will handle it's own thread pool for filesystem work - let mut handle = tm.download().bucket(bucket).key(key).send().await?; + let mut handle = tm.download().bucket(bucket).key(key).initiate()?; write_body(handle.body_mut(), dest) .instrument(tracing::debug_span!("write-output")) .await?; let elapsed = start.elapsed(); - let obj_size_bytes = handle.object_meta.total_size(); + let obj_size_bytes = handle.object_meta().await?.content_length(); let throughput = Throughput::new(obj_size_bytes, elapsed); println!( @@ -298,7 +298,7 @@ async fn main() -> Result<(), BoxError> { async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError> { while let Some(chunk) = body.next().await { - let chunk = chunk.unwrap(); + let chunk = chunk.unwrap().data; tracing::trace!("recv'd chunk remaining={}", chunk.remaining()); let mut segment_cnt = 1; for segment in chunk.into_segments() { diff --git a/aws-s3-transfer-manager/external-types.toml b/aws-s3-transfer-manager/external-types.toml index ebc3d65..e372912 100644 --- a/aws-s3-transfer-manager/external-types.toml +++ b/aws-s3-transfer-manager/external-types.toml @@ -6,4 +6,7 @@ allowed_external_types = [ "tokio::runtime::task::error::JoinError", "tokio::io::async_read::AsyncRead", "bytes::bytes::Bytes", + "bytes::buf::buf_impl::Buf", + "aws_types::request_id::RequestId", + "aws_types::request_id::RequestIdExt" ] diff --git a/aws-s3-transfer-manager/src/client.rs b/aws-s3-transfer-manager/src/client.rs index 19334c4..29e1488 100644 --- a/aws-s3-transfer-manager/src/client.rs +++ b/aws-s3-transfer-manager/src/client.rs @@ -136,8 +136,7 @@ impl Client { /// .download() /// .bucket("my-bucket") /// .key("my-key") - /// .send() - /// .await?; + /// .initiate()?; /// /// // process data off handle... /// diff --git a/aws-s3-transfer-manager/src/io/aggregated_bytes.rs b/aws-s3-transfer-manager/src/io/aggregated_bytes.rs new file mode 100644 index 0000000..e470657 --- /dev/null +++ b/aws-s3-transfer-manager/src/io/aggregated_bytes.rs @@ -0,0 +1,82 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use std::io::IoSlice; + +use aws_sdk_s3::primitives::ByteStream; +use bytes::{Buf, Bytes}; +use bytes_utils::SegmentedBuf; + +use crate::error::ErrorKind; + +/// Non-contiguous Binary Data Storage +/// +/// When data is read from the network, it is read in a sequence of chunks that are not in +/// contiguous memory. [`AggregatedBytes`] provides a view of +/// this data via [`impl Buf`](bytes::Buf) or it can be copied into contiguous storage with +/// [`.into_bytes()`](crate::io::aggregated_bytes::AggregatedBytes::into_bytes). +#[derive(Debug, Clone)] +pub struct AggregatedBytes(pub(crate) SegmentedBuf); + +impl AggregatedBytes { + /// Convert this buffer into [`Bytes`]. + /// + /// # Why does this consume `self`? + /// Technically, [`copy_to_bytes`](bytes::Buf::copy_to_bytes) can be called without ownership of self. However, since this + /// mutates the underlying buffer such that no data is remaining, it is more misuse resistant to + /// prevent the caller from attempting to reread the buffer. + /// + /// If the caller only holds a mutable reference, they may use [`copy_to_bytes`](bytes::Buf::copy_to_bytes) + /// directly on `AggregatedBytes`. + pub fn into_bytes(mut self) -> Bytes { + self.0.copy_to_bytes(self.0.remaining()) + } + + /// Convert this buffer into an [`Iterator`] of underlying non-contiguous segments of [`Bytes`] + pub fn into_segments(self) -> impl Iterator { + self.0.into_inner().into_iter() + } + + /// Convert this buffer into a `Vec` + pub fn to_vec(self) -> Vec { + self.0.into_inner().into_iter().flatten().collect() + } + + /// Make this buffer from a ByteStream + pub(crate) async fn from_byte_stream(value: ByteStream) -> Result { + let mut value = value; + let mut output = SegmentedBuf::new(); + while let Some(buf) = value.next().await { + match buf { + Ok(buf) => output.push(buf), + Err(err) => return Err(crate::error::from_kind(ErrorKind::ChunkFailed)(err)), + }; + } + Ok(AggregatedBytes(output)) + } +} + +impl Buf for AggregatedBytes { + // Forward all methods that SegmentedBuf has custom implementations of. + fn remaining(&self) -> usize { + self.0.remaining() + } + + fn chunk(&self) -> &[u8] { + self.0.chunk() + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + self.0.chunks_vectored(dst) + } + + fn advance(&mut self, cnt: usize) { + self.0.advance(cnt) + } + + fn copy_to_bytes(&mut self, len: usize) -> Bytes { + self.0.copy_to_bytes(len) + } +} diff --git a/aws-s3-transfer-manager/src/io/mod.rs b/aws-s3-transfer-manager/src/io/mod.rs index 26569c0..7edcca6 100644 --- a/aws-s3-transfer-manager/src/io/mod.rs +++ b/aws-s3-transfer-manager/src/io/mod.rs @@ -5,6 +5,8 @@ /// Adapters for other IO library traits to map to `InputStream` pub mod adapters; +/// Download Body Type +mod aggregated_bytes; mod buffer; pub(crate) mod part_reader; mod path_body; @@ -15,6 +17,7 @@ pub mod error; mod size_hint; // re-exports +pub use self::aggregated_bytes::AggregatedBytes; pub(crate) use self::buffer::Buffer; pub use self::path_body::PathBodyBuilder; pub use self::size_hint::SizeHint; diff --git a/aws-s3-transfer-manager/src/operation/download.rs b/aws-s3-transfer-manager/src/operation/download.rs index b265e79..df03fc3 100644 --- a/aws-s3-transfer-manager/src/operation/download.rs +++ b/aws-s3-transfer-manager/src/operation/download.rs @@ -20,19 +20,26 @@ mod handle; pub use handle::DownloadHandle; use tracing::Instrument; +/// Provides metadata for each chunk during an object download. +mod chunk_meta; +pub use chunk_meta::ChunkMetadata; +/// Provides metadata for a single S3 object during download. mod object_meta; +pub use object_meta::ObjectMetadata; + mod service; use crate::error; +use crate::io::AggregatedBytes; use crate::runtime::scheduler::OwnedWorkPermit; use aws_smithy_types::byte_stream::ByteStream; -use body::Body; +use body::{Body, ChunkOutput}; use discovery::discover_obj; -use service::{distribute_work, ChunkResponse}; +use service::distribute_work; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::task::JoinSet; +use tokio::sync::{mpsc, oneshot, Mutex, OnceCell}; +use tokio::task::{self, JoinSet}; use super::TransferContext; @@ -50,9 +57,9 @@ impl Download { /// /// If `use_current_span_as_parent_for_tasks` is false, spawned tasks will /// "follow from" the current span, but be under their own root of the trace tree. - /// Use this for `TransferManager.download().send()`, where the spawned tasks - /// should NOT extend the life of the current `send()` span. - pub(crate) async fn orchestrate( + /// Use this for `TransferManager.download().initiate()`, where the spawned tasks + /// should NOT extend the life of the current `initiate()` span. + pub(crate) fn orchestrate( handle: Arc, input: crate::operation::download::DownloadInput, use_current_span_as_parent_for_tasks: bool, @@ -62,64 +69,86 @@ impl Download { todo!("single part download not implemented") } - let concurrency = handle.num_workers(); let ctx = DownloadContext::new(handle); - - // create span to serve as parent of spawned child tasks. - let parent_span_for_tasks = tracing::debug_span!( - parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } , - "download-tasks", - bucket = input.bucket().unwrap_or_default(), - key = input.key().unwrap_or_default(), - ); - if !use_current_span_as_parent_for_tasks { - // if not child of current span, then "follows from" current span - parent_span_for_tasks.follows_from(tracing::Span::current()); - } - - // acquire a permit for discovery - let permit = ctx.handle.scheduler.acquire_permit().await?; - - // make initial discovery about the object size, metadata, possibly first chunk - let mut discovery = discover_obj(&ctx, &input).await?; + let concurrency = ctx.handle.num_workers(); let (comp_tx, comp_rx) = mpsc::channel(concurrency); - - let initial_chunk = discovery.initial_chunk.take(); - - let mut handle = DownloadHandle { - // FIXME(aws-sdk-rust#1159) - initial object discovery for a range/first-part will not - // have the correct metadata w.r.t. content-length and maybe others for the whole object. - object_meta: discovery.meta, + let (object_meta_tx, object_meta_rx) = oneshot::channel(); + + let tasks = Arc::new(Mutex::new(JoinSet::new())); + let discovery = tokio::spawn(send_discovery( + tasks.clone(), + ctx.clone(), + comp_tx, + object_meta_tx, + input, + use_current_span_as_parent_for_tasks, + )); + + Ok(DownloadHandle { body: Body::new(comp_rx), - // spawn all work into the same JoinSet such that when the set is dropped all tasks are cancelled. - tasks: JoinSet::new(), - ctx, - }; - - // spawn a task (if necessary) to handle the discovery chunk. This returns immediately so - // that we can begin concurrently downloading any reamining chunks/parts ASAP - let start_seq = handle_discovery_chunk( - &mut handle, - initial_chunk, - &comp_tx, - permit, - parent_span_for_tasks.clone(), - ); + tasks, + discovery, + object_meta_receiver: Mutex::new(Some(object_meta_rx)), + object_meta: OnceCell::new(), + }) + } +} - if !discovery.remaining.is_empty() { - let remaining = discovery.remaining.clone(); - distribute_work( - &mut handle, - remaining, - input, - start_seq, - comp_tx, - parent_span_for_tasks, - ) - } +async fn send_discovery( + tasks: Arc>>, + ctx: DownloadContext, + comp_tx: mpsc::Sender>, + object_meta_tx: oneshot::Sender, + input: DownloadInput, + use_current_span_as_parent_for_tasks: bool, +) -> Result<(), crate::error::Error> { + // create span to serve as parent of spawned child tasks. + let parent_span_for_tasks = tracing::debug_span!( + parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } , + "download-tasks", + bucket = input.bucket().unwrap_or_default(), + key = input.key().unwrap_or_default(), + ); + if !use_current_span_as_parent_for_tasks { + // if not child of current span, then "follows from" current span + parent_span_for_tasks.follows_from(tracing::Span::current()); + } - Ok(handle) + // acquire a permit for discovery + let permit = ctx.handle.scheduler.acquire_permit().await?; + + // make initial discovery about the object size, metadata, possibly first chunk + let mut discovery = discover_obj(&ctx, &input).await?; + // FIXME - This will fail if the handle is dropped at this point. We should handle + // the cancellation gracefully here. + let _ = object_meta_tx.send(discovery.object_meta); + let initial_chunk = discovery.initial_chunk.take(); + + let mut tasks = tasks.lock().await; + // spawn a task (if necessary) to handle the discovery chunk. This returns immediately so + // that we can begin concurrently downloading any remaining chunks/parts ASAP + let start_seq = handle_discovery_chunk( + &mut tasks, + ctx.clone(), + initial_chunk, + &comp_tx, + permit, + parent_span_for_tasks.clone(), + discovery.chunk_meta, + ); + + if !discovery.remaining.is_empty() { + distribute_work( + &mut tasks, + ctx.clone(), + discovery.remaining, + input, + start_seq, + comp_tx, + parent_span_for_tasks, + ); } + Ok(()) } /// Handle possibly sending the first chunk of data received through discovery. Returns @@ -130,24 +159,26 @@ impl Download { /// This allows remaining work to start immediately (and concurrently) without /// waiting for the first chunk. fn handle_discovery_chunk( - handle: &mut DownloadHandle, + tasks: &mut task::JoinSet<()>, + ctx: DownloadContext, initial_chunk: Option, - completed: &mpsc::Sender>, + completed: &mpsc::Sender>, permit: OwnedWorkPermit, parent_span_for_tasks: tracing::Span, + metadata: Option, ) -> u64 { if let Some(stream) = initial_chunk { - let seq = handle.ctx.next_seq(); + let seq = ctx.next_seq(); let completed = completed.clone(); // spawn a task to actually read the discovery chunk without waiting for it so we // can get started sooner on any remaining work (if any) - handle.tasks.spawn(async move { - let chunk = stream - .collect() + tasks.spawn(async move { + let chunk = AggregatedBytes::from_byte_stream(stream) .await - .map(|aggregated| ChunkResponse { + .map(|aggregated| ChunkOutput { seq, - data: Some(aggregated), + data: aggregated, + metadata: metadata.expect("chunk metadata is available"), }) .map_err(error::discovery_failed); @@ -162,7 +193,7 @@ fn handle_discovery_chunk( } }.instrument(tracing::debug_span!(parent: parent_span_for_tasks.clone(), "collect-body-from-discovery", seq))); } - handle.ctx.current_seq() + ctx.current_seq() } /// Download operation specific state diff --git a/aws-s3-transfer-manager/src/operation/download/body.rs b/aws-s3-transfer-manager/src/operation/download/body.rs index aa965dd..e5348c9 100644 --- a/aws-s3-transfer-manager/src/operation/download/body.rs +++ b/aws-s3-transfer-manager/src/operation/download/body.rs @@ -2,13 +2,16 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ -use crate::operation::download::service::ChunkResponse; -use aws_smithy_types::byte_stream::AggregatedBytes; + use std::cmp; use std::cmp::Ordering; use std::collections::BinaryHeap; use tokio::sync::mpsc; +use crate::io::AggregatedBytes; + +use super::chunk_meta::ChunkMetadata; + /// Stream of binary data representing an Amazon S3 Object's contents. /// /// Wraps potentially multiple streams of binary data into a single coherent stream. @@ -19,10 +22,28 @@ pub struct Body { sequencer: Sequencer, } -type BodyChannel = mpsc::Receiver>; +type BodyChannel = mpsc::Receiver>; + +/// Contains body and metadata for each GetObject call made. This will be delivered sequentially +/// in-order. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ChunkOutput { + // TODO(aws-sdk-rust#1159, design) - consider PartialOrd for ChunkResponse and hiding `seq` as internal only detail + // the seq number + pub(crate) seq: u64, + /// The content associated with this particular ranged GetObject request. + pub data: AggregatedBytes, + /// The metadata associated with this particular ranged GetObject request. This contains all the + /// metadata returned by the S3 GetObject operation. + pub metadata: ChunkMetadata, +} +// TODO: Do we want to expose something to yield multiple chunks in a single call, like +// recv_many/collect, etc.? We can benchmark to see if we get a significant performance boost once +// we have a better scheduler in place. impl Body { - /// Create a new empty Body + /// Create a new empty body pub fn empty() -> Self { Self::new_from_channel(None) } @@ -39,7 +60,8 @@ impl Body { } /// Convert this body into an unordered stream of chunks. - // TODO(aws-sdk-rust#1159) - revisit if we actually need/use unordered data stream + // TODO(aws-sdk-rust#1159) - revisit if we actually need/use unordered data stream. + // download_objects should utilize this so that it can write in parallel to files. #[allow(dead_code)] pub(crate) fn unordered(self) -> UnorderedBody { self.inner @@ -50,9 +72,7 @@ impl Body { /// Returns [None] when there is no more data. /// Chunks returned from a [Body] are guaranteed to be sequenced /// in the right order. - pub async fn next(&mut self) -> Option> { - // TODO(aws-sdk-rust#1159, design) - do we want ChunkResponse (or similar) rather than AggregatedBytes? Would - // make additional retries of an individual chunk/part more feasible (though theoretically already exhausted retries) + pub async fn next(&mut self) -> Option> { loop { if self.sequencer.is_ordered() { break; @@ -65,17 +85,13 @@ impl Body { } } - let chunk = self - .sequencer - .pop() - .map(|r| Ok(r.data.expect("chunk data"))); - - if chunk.is_some() { - // if we actually pulled data out, advance the next sequence we expect + let chunk = self.sequencer.pop(); + if let Some(chunk) = chunk { self.sequencer.advance(); + Some(Ok(chunk)) + } else { + None } - - chunk } /// Close the body, no more data will flow from it and all publishers will be notified. @@ -99,11 +115,11 @@ impl Sequencer { } } - fn push(&mut self, chunk: ChunkResponse) { + fn push(&mut self, chunk: ChunkOutput) { self.chunks.push(cmp::Reverse(SequencedChunk(chunk))) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Option { self.chunks.pop().map(|c| c.0 .0) } @@ -116,7 +132,7 @@ impl Sequencer { next.unwrap().seq == self.next_seq } - fn peek(&self) -> Option<&ChunkResponse> { + fn peek(&self) -> Option<&ChunkOutput> { self.chunks.peek().map(|c| &c.0 .0) } @@ -126,7 +142,7 @@ impl Sequencer { } #[derive(Debug)] -struct SequencedChunk(ChunkResponse); +struct SequencedChunk(ChunkOutput); impl Ord for SequencedChunk { fn cmp(&self, other: &Self) -> Ordering { @@ -150,7 +166,7 @@ impl PartialEq for SequencedChunk { /// A body that returns chunks in whatever order they are received. #[derive(Debug)] pub(crate) struct UnorderedBody { - chunks: Option>>, + chunks: Option>>, } impl UnorderedBody { @@ -164,7 +180,7 @@ impl UnorderedBody { /// Chunks returned from an [UnorderedBody] are not guaranteed to be sorted /// in the right order. Consumers are expected to sort the data themselves /// using the chunk sequence number (starting from zero). - pub(crate) async fn next(&mut self) -> Option> { + pub(crate) async fn next(&mut self) -> Option> { match self.chunks.as_mut() { None => None, Some(ch) => ch.recv().await, @@ -181,24 +197,28 @@ impl UnorderedBody { #[cfg(test)] mod tests { - use crate::{error, operation::download::service::ChunkResponse}; - use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream}; + use crate::{error, operation::download::body::ChunkOutput}; use bytes::Bytes; + use bytes_utils::SegmentedBuf; use tokio::sync::mpsc; - use super::{Body, Sequencer}; + use super::{AggregatedBytes, Body, Sequencer}; - fn chunk_resp(seq: u64, data: Option) -> ChunkResponse { - ChunkResponse { seq, data } + fn chunk_resp(seq: u64, data: AggregatedBytes) -> ChunkOutput { + ChunkOutput { + seq, + data, + metadata: Default::default(), + } } #[test] fn test_sequencer() { let mut sequencer = Sequencer::new(); - sequencer.push(chunk_resp(1, None)); - sequencer.push(chunk_resp(2, None)); + sequencer.push(chunk_resp(1, AggregatedBytes(SegmentedBuf::new()))); + sequencer.push(chunk_resp(2, AggregatedBytes(SegmentedBuf::new()))); assert_eq!(sequencer.peek().unwrap().seq, 1); - sequencer.push(chunk_resp(0, None)); + sequencer.push(chunk_resp(0, AggregatedBytes(SegmentedBuf::new()))); assert_eq!(sequencer.pop().unwrap().seq, 0); } @@ -210,8 +230,9 @@ mod tests { let seq = vec![2, 0, 1]; for i in seq { let data = Bytes::from(format!("chunk {i}")); - let aggregated = ByteStream::from(data).collect().await.unwrap(); - let chunk = chunk_resp(i as u64, Some(aggregated)); + let mut aggregated = SegmentedBuf::new(); + aggregated.push(data); + let chunk = chunk_resp(i as u64, AggregatedBytes(aggregated)); tx.send(Ok(chunk)).await.unwrap(); } }); @@ -219,7 +240,7 @@ mod tests { let mut received = Vec::new(); while let Some(chunk) = body.next().await { let chunk = chunk.expect("chunk ok"); - let data = String::from_utf8(chunk.to_vec()).unwrap(); + let data = String::from_utf8(chunk.data.to_vec()).unwrap(); received.push(data); } @@ -233,8 +254,9 @@ mod tests { let mut body = Body::new(rx); tokio::spawn(async move { let data = Bytes::from("chunk 0".to_string()); - let aggregated = ByteStream::from(data).collect().await.unwrap(); - let chunk = chunk_resp(0, Some(aggregated)); + let mut aggregated = SegmentedBuf::new(); + aggregated.push(data); + let chunk = chunk_resp(0, AggregatedBytes(aggregated)); tx.send(Ok(chunk)).await.unwrap(); let err = error::Error::new(error::ErrorKind::InputInvalid, "test errors".to_string()); tx.send(Err(err)).await.unwrap(); diff --git a/aws-s3-transfer-manager/src/operation/download/builders.rs b/aws-s3-transfer-manager/src/operation/download/builders.rs index d601d42..8bd1b41 100644 --- a/aws-s3-transfer-manager/src/operation/download/builders.rs +++ b/aws-s3-transfer-manager/src/operation/download/builders.rs @@ -26,9 +26,9 @@ impl DownloadFluentBuilder { bucket = self.inner.bucket.as_deref().unwrap_or_default(), key = self.inner.key.as_deref().unwrap_or_default(), ))] - pub async fn send(self) -> Result { + pub fn initiate(self) -> Result { let input = self.inner.build()?; - crate::operation::download::Download::orchestrate(self.handle, input, false).await + crate::operation::download::Download::orchestrate(self.handle, input, false) } ///

The bucket name containing the object.

@@ -524,12 +524,9 @@ impl DownloadFluentBuilder { impl crate::operation::download::input::DownloadInputBuilder { /// Initiate a download transfer for a single object with this input using the given client. - pub async fn send_with( - self, - client: &crate::Client, - ) -> Result { + pub fn send_with(self, client: &crate::Client) -> Result { let mut fluent_builder = client.download(); fluent_builder.inner = self; - fluent_builder.send().await + fluent_builder.initiate() } } diff --git a/aws-s3-transfer-manager/src/operation/download/chunk_meta.rs b/aws-s3-transfer-manager/src/operation/download/chunk_meta.rs new file mode 100644 index 0000000..96ee464 --- /dev/null +++ b/aws-s3-transfer-manager/src/operation/download/chunk_meta.rs @@ -0,0 +1,228 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_sdk_s3::operation::get_object::GetObjectOutput; +use aws_sdk_s3::operation::RequestId; +use aws_sdk_s3::operation::RequestIdExt; + +/// Chunk Metadata, other than the body, that will be set from the `GetObject` request. +#[derive(Clone, Default)] +#[non_exhaustive] +pub struct ChunkMetadata { + ///

Indicates whether the object retrieved was (true) or was not (false) a Delete Marker. If false, this response header does not appear in the response.

+ ///
    + ///
  • + ///

    If the current version of the object is a delete marker, Amazon S3 behaves as if the object was deleted and includes x-amz-delete-marker: true in the response.

  • + ///
  • + ///

    If the specified version in the request is a delete marker, the response returns a 405 Method Not Allowed error and the Last-Modified: timestamp response header.

  • + ///
+ ///
+ pub delete_marker: Option, + ///

Indicates that a range of bytes was specified in the request.

+ pub accept_ranges: Option, + ///

If the object expiration is configured (see PutBucketLifecycleConfiguration ), the response includes this header. It includes the expiry-date and rule-id key-value pairs providing object expiration information. The value of the rule-id is URL-encoded.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub expiration: Option, + ///

Provides information about object restoration action and expiration time of the restored object copy.

+ ///

This functionality is not supported for directory buckets. Only the S3 Express One Zone storage class is supported by directory buckets to store objects.

+ ///
+ pub restore: Option, + ///

Date and time when the object was last modified.

+ ///

General purpose buckets - When you specify a versionId of the object in your request, if the specified version in the request is a delete marker, the response returns a 405 Method Not Allowed error and the Last-Modified: timestamp response header.

+ pub last_modified: Option<::aws_smithy_types::DateTime>, + ///

Size of the body in bytes.

+ pub content_length: Option, + ///

An entity tag (ETag) is an opaque identifier assigned by a web server to a specific version of a resource found at a URL.

+ pub e_tag: Option, + ///

The base64-encoded, 32-bit CRC-32 checksum of the object. This will only be present if it was uploaded with the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub checksum_crc32: Option, + ///

The base64-encoded, 32-bit CRC-32C checksum of the object. This will only be present if it was uploaded with the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub checksum_crc32_c: Option, + ///

The base64-encoded, 160-bit SHA-1 digest of the object. This will only be present if it was uploaded with the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub checksum_sha1: Option, + ///

The base64-encoded, 256-bit SHA-256 digest of the object. This will only be present if it was uploaded with the object. For more information, see Checking object integrity in the Amazon S3 User Guide.

+ pub checksum_sha256: Option, + ///

This is set to the number of metadata entries not returned in the headers that are prefixed with x-amz-meta-. This can happen if you create metadata using an API like SOAP that supports more flexible metadata than the REST API. For example, using SOAP, you can create metadata whose values are not legal HTTP headers.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub missing_meta: Option, + ///

Version ID of the object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub version_id: Option, + ///

Specifies caching behavior along the request/reply chain.

+ pub cache_control: Option, + ///

Specifies presentational information for the object.

+ pub content_disposition: Option, + ///

Indicates what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field.

+ pub content_encoding: Option, + ///

The language the content is in.

+ pub content_language: Option, + ///

The portion of the object returned in the response.

+ pub content_range: Option, + ///

A standard MIME type describing the format of the object data.

+ pub content_type: Option, + ///

If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub website_redirect_location: Option, + ///

The server-side encryption algorithm used when you store this object in Amazon S3.

+ pub server_side_encryption: Option, + ///

A map of metadata to store with the object in S3.

+ pub metadata: Option<::std::collections::HashMap>, + ///

If server-side encryption with a customer-provided encryption key was requested, the response will include this header to confirm the encryption algorithm that's used.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub sse_customer_algorithm: Option, + ///

If server-side encryption with a customer-provided encryption key was requested, the response will include this header to provide the round-trip message integrity verification of the customer-provided encryption key.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub sse_customer_key_md5: Option, + ///

If present, indicates the ID of the KMS key that was used for object encryption.

+ pub ssekms_key_id: Option, + ///

Indicates whether the object uses an S3 Bucket Key for server-side encryption with Key Management Service (KMS) keys (SSE-KMS).

+ pub bucket_key_enabled: Option, + ///

Provides storage class information of the object. Amazon S3 returns this header for all objects except for S3 Standard storage class objects.

+ ///

Directory buckets - Only the S3 Express One Zone storage class is supported by directory buckets to store objects.

+ ///
+ pub storage_class: Option, + ///

If present, indicates that the requester was successfully charged for the request.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub request_charged: Option, + ///

Amazon S3 can return this if your request involves a bucket that is either a source or destination in a replication rule.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub replication_status: Option, + ///

The count of parts this object has. This value is only returned if you specify partNumber in your request and the object was uploaded as a multipart upload.

+ pub parts_count: Option, + ///

The number of tags, if any, on the object, when you have the relevant permission to read object tags.

+ ///

You can use GetObjectTagging to retrieve the tag set associated with an object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub tag_count: Option, + ///

The Object Lock mode that's currently in place for this object.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub object_lock_mode: Option, + ///

The date and time when this object's Object Lock will expire.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub object_lock_retain_until_date: Option<::aws_smithy_types::DateTime>, + ///

Indicates whether this object has an active legal hold. This field is only returned if you have permission to view an object's legal hold status.

+ ///

This functionality is not supported for directory buckets.

+ ///
+ pub object_lock_legal_hold_status: Option, + ///

The date and time at which the object is no longer cacheable.

+ pub expires_string: Option, + _request_id: Option, + _extended_request_id: Option, +} + +impl From for ChunkMetadata { + fn from(value: GetObjectOutput) -> Self { + Self { + _request_id: value.request_id().map(|s| s.to_string()), + _extended_request_id: value.extended_request_id().map(|s| s.to_string()), + delete_marker: value.delete_marker, + accept_ranges: value.accept_ranges, + expiration: value.expiration, + restore: value.restore, + last_modified: value.last_modified, + content_length: value.content_length, + e_tag: value.e_tag, + checksum_crc32: value.checksum_crc32, + checksum_crc32_c: value.checksum_crc32_c, + checksum_sha1: value.checksum_sha1, + checksum_sha256: value.checksum_sha256, + missing_meta: value.missing_meta, + version_id: value.version_id, + cache_control: value.cache_control, + content_disposition: value.content_disposition, + content_encoding: value.content_encoding, + content_language: value.content_language, + content_range: value.content_range, + content_type: value.content_type, + website_redirect_location: value.website_redirect_location, + server_side_encryption: value.server_side_encryption, + metadata: value.metadata, + sse_customer_algorithm: value.sse_customer_algorithm, + sse_customer_key_md5: value.sse_customer_key_md5, + ssekms_key_id: value.ssekms_key_id, + bucket_key_enabled: value.bucket_key_enabled, + storage_class: value.storage_class, + request_charged: value.request_charged, + replication_status: value.replication_status, + parts_count: value.parts_count, + tag_count: value.tag_count, + object_lock_mode: value.object_lock_mode, + object_lock_retain_until_date: value.object_lock_retain_until_date, + object_lock_legal_hold_status: value.object_lock_legal_hold_status, + expires_string: value.expires_string, + } + } +} + +impl RequestIdExt for ChunkMetadata { + fn extended_request_id(&self) -> Option<&str> { + self._extended_request_id.as_deref() + } +} +impl RequestId for ChunkMetadata { + fn request_id(&self) -> Option<&str> { + self._request_id.as_deref() + } +} + +impl ::std::fmt::Debug for ChunkMetadata { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + let mut formatter = f.debug_struct("ChunkMetadata"); + formatter.field("delete_marker", &self.delete_marker); + formatter.field("accept_ranges", &self.accept_ranges); + formatter.field("expiration", &self.expiration); + formatter.field("restore", &self.restore); + formatter.field("last_modified", &self.last_modified); + formatter.field("content_length", &self.content_length); + formatter.field("e_tag", &self.e_tag); + formatter.field("checksum_crc32", &self.checksum_crc32); + formatter.field("checksum_crc32_c", &self.checksum_crc32_c); + formatter.field("checksum_sha1", &self.checksum_sha1); + formatter.field("checksum_sha256", &self.checksum_sha256); + formatter.field("missing_meta", &self.missing_meta); + formatter.field("version_id", &self.version_id); + formatter.field("cache_control", &self.cache_control); + formatter.field("content_disposition", &self.content_disposition); + formatter.field("content_encoding", &self.content_encoding); + formatter.field("content_language", &self.content_language); + formatter.field("content_range", &self.content_range); + formatter.field("content_type", &self.content_type); + formatter.field("website_redirect_location", &self.website_redirect_location); + formatter.field("server_side_encryption", &self.server_side_encryption); + formatter.field("metadata", &self.metadata); + formatter.field("sse_customer_algorithm", &self.sse_customer_algorithm); + formatter.field("sse_customer_key_md5", &self.sse_customer_key_md5); + formatter.field("ssekms_key_id", &"*** Sensitive Data Redacted ***"); + formatter.field("bucket_key_enabled", &self.bucket_key_enabled); + formatter.field("storage_class", &self.storage_class); + formatter.field("request_charged", &self.request_charged); + formatter.field("replication_status", &self.replication_status); + formatter.field("parts_count", &self.parts_count); + formatter.field("tag_count", &self.tag_count); + formatter.field("object_lock_mode", &self.object_lock_mode); + formatter.field( + "object_lock_retain_until_date", + &self.object_lock_retain_until_date, + ); + formatter.field( + "object_lock_legal_hold_status", + &self.object_lock_legal_hold_status, + ); + formatter.field("expires_string", &self.expires_string); + formatter.field("_extended_request_id", &self._extended_request_id); + formatter.field("_request_id", &self._request_id); + formatter.finish() + } +} diff --git a/aws-s3-transfer-manager/src/operation/download/discovery.rs b/aws-s3-transfer-manager/src/operation/download/discovery.rs index cf2abe7..ed72136 100644 --- a/aws-s3-transfer-manager/src/operation/download/discovery.rs +++ b/aws-s3-transfer-manager/src/operation/download/discovery.rs @@ -12,6 +12,7 @@ use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; use tracing::Instrument; +use super::chunk_meta::ChunkMetadata; use super::object_meta::ObjectMetadata; use super::DownloadContext; use super::DownloadInput; @@ -35,7 +36,8 @@ pub(super) struct ObjectDiscovery { pub(super) remaining: RangeInclusive, /// the discovered metadata - pub(super) meta: ObjectMetadata, + pub(super) chunk_meta: Option, + pub(super) object_meta: ObjectMetadata, /// the first chunk of data if fetched during discovery pub(super) initial_chunk: Option, @@ -111,28 +113,31 @@ async fn discover_obj_with_head( input: &DownloadInput, byte_range: Option, ) -> Result { - let meta: ObjectMetadata = ctx + let resp = ctx .client() .head_object() .set_bucket(input.bucket().map(str::to_string)) .set_key(input.key().map(str::to_string)) .send() .await - .map_err(error::discovery_failed)? - .into(); + .map_err(error::discovery_failed)?; + let object_meta: ObjectMetadata = resp.into(); let remaining = match byte_range { Some(range) => match range { ByteRange::Inclusive(start, end) => start..=end, - ByteRange::AllFrom(start) => start..=meta.total_size(), - ByteRange::Last(n) => (meta.total_size() - n + 1)..=meta.total_size(), + ByteRange::AllFrom(start) => start..=object_meta.content_length(), + ByteRange::Last(n) => { + (object_meta.content_length() - n + 1)..=object_meta.content_length() + } }, - None => 0..=meta.total_size(), + None => 0..=object_meta.content_length(), }; Ok(ObjectDiscovery { remaining, - meta, + chunk_meta: None, + object_meta, initial_chunk: None, }) } @@ -154,12 +159,13 @@ async fn discover_obj_with_get( let empty_stream = ByteStream::new(SdkBody::empty()); let body = mem::replace(&mut resp.body, empty_stream); - let meta: ObjectMetadata = resp.into(); - let content_len = meta.content_length(); + let object_meta: ObjectMetadata = (&resp).into(); + let chunk_meta: ChunkMetadata = resp.into(); + let content_len = chunk_meta.content_length.expect("expected content_length") as u64; let remaining = match range { Some(range) => (*range.start() + content_len)..=*range.end(), - None => content_len..=meta.total_size() - 1, + None => content_len..=object_meta.content_length() - 1, }; let initial_chunk = match content_len == 0 { @@ -169,7 +175,8 @@ async fn discover_obj_with_get( Ok(ObjectDiscovery { remaining, - meta, + chunk_meta: Some(chunk_meta), + object_meta, initial_chunk, }) } diff --git a/aws-s3-transfer-manager/src/operation/download/handle.rs b/aws-s3-transfer-manager/src/operation/download/handle.rs index f659742..63689f3 100644 --- a/aws-s3-transfer-manager/src/operation/download/handle.rs +++ b/aws-s3-transfer-manager/src/operation/download/handle.rs @@ -1,34 +1,56 @@ +use std::sync::Arc; + +use crate::error::{self, ErrorKind}; +use tokio::{ + sync::{oneshot::Receiver, Mutex, OnceCell}, + task, +}; + /* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ use crate::operation::download::body::Body; -use crate::operation::download::object_meta::ObjectMetadata; -use tokio::task; -use super::DownloadContext; +use super::object_meta::ObjectMetadata; /// Response type for a single download object request. #[derive(Debug)] #[non_exhaustive] pub struct DownloadHandle { - /// Object metadata - pub object_meta: ObjectMetadata, + /// Object metadata receiver. + pub(crate) object_meta_receiver: Mutex>>, + /// Object metadata. + pub(crate) object_meta: OnceCell, /// The object content pub(crate) body: Body, - /// All child tasks spawned for this download - pub(crate) tasks: task::JoinSet<()>, + /// Discovery task + pub(crate) discovery: task::JoinHandle>, - /// The context used to drive an upload to completion - pub(crate) ctx: DownloadContext, + /// All child tasks (ranged GetObject) spawned for this download + pub(crate) tasks: Arc>>, } impl DownloadHandle { /// Object metadata - pub fn object_meta(&self) -> &ObjectMetadata { - &self.object_meta + pub async fn object_meta(&self) -> Result<&ObjectMetadata, error::Error> { + let meta = self + .object_meta + .get_or_try_init(|| async { + let mut object_meta_receiver = self.object_meta_receiver.lock().await; + let object_meta_receiver = object_meta_receiver + .take() + .ok_or("meta_receiver is already taken") + .map_err(error::from_kind(ErrorKind::ObjectNotDiscoverable))?; + object_meta_receiver + .await + .map_err(error::from_kind(ErrorKind::ObjectNotDiscoverable)) + }) + .await?; + + Ok(meta) } /// Object content @@ -45,7 +67,12 @@ impl DownloadHandle { #[tracing::instrument(skip_all, level = "debug", name = "join-download")] pub async fn join(mut self) -> Result<(), crate::error::Error> { self.body.close(); - while let Some(join_result) = self.tasks.join_next().await { + + self.discovery.await??; + // It's safe to grab the lock here because discovery is already complete, and we will never + // lock tasks again after discovery to spawn more tasks. + let mut tasks = self.tasks.lock().await; + while let Some(join_result) = tasks.join_next().await { join_result?; } Ok(()) diff --git a/aws-s3-transfer-manager/src/operation/download/object_meta.rs b/aws-s3-transfer-manager/src/operation/download/object_meta.rs index be70541..ff421e8 100644 --- a/aws-s3-transfer-manager/src/operation/download/object_meta.rs +++ b/aws-s3-transfer-manager/src/operation/download/object_meta.rs @@ -3,60 +3,113 @@ * SPDX-License-Identifier: Apache-2.0 */ -use std::str::FromStr; - use aws_sdk_s3::operation::get_object::GetObjectOutput; use aws_sdk_s3::operation::head_object::HeadObjectOutput; - -use crate::http::header; - -// TODO(aws-sdk-rust#1159,design): how many of these fields should we expose? -// TODO(aws-sdk-rust#1159,docs): Document fields +use aws_sdk_s3::operation::RequestId; +use aws_sdk_s3::operation::RequestIdExt; /// Object metadata other than the body that can be set from either `GetObject` or `HeadObject` -#[derive(Debug, Clone, Default)] +/// In the case of GetObject, some data will be duplicated as part of the first chunk. +#[derive(Clone, Default)] +#[non_exhaustive] pub struct ObjectMetadata { + _request_id: Option, + _extended_request_id: Option, + ///

Indicates whether the object retrieved was (true) or was not (false) a Delete Marker. If false, this response header does not appear in the response.

+ ///
    + ///
  • + ///

    If the current version of the object is a delete marker, Amazon S3 behaves as if the object was deleted and includes x-amz-delete-marker: true in the response.

  • + ///
  • + ///

    If the specified version in the request is a delete marker, the response returns a 405 Method Not Allowed error and the Last-Modified: timestamp response header.

  • + ///
+ ///
pub delete_marker: Option, - pub accept_ranges: Option, + ///

If the object expiration is configured (see PutBucketLifecycleConfiguration ), the response includes this header. It includes the expiry-date and rule-id key-value pairs providing object expiration information. The value of the rule-id is URL-encoded.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub expiration: Option, + ///

Provides information about object restoration action and expiration time of the restored object copy.

+ ///

This functionality is not supported for directory buckets. Only the S3 Express One Zone storage class is supported by directory buckets to store objects.

+ ///
pub restore: Option, + ///

Date and time when the object was last modified.

+ ///

General purpose buckets - When you specify a versionId of the object in your request, if the specified version in the request is a delete marker, the response returns a 405 Method Not Allowed error and the Last-Modified: timestamp response header.

pub last_modified: Option<::aws_smithy_types::DateTime>, - pub content_length: Option, + pub(crate) content_length: Option, + ///

An entity tag (ETag) is an opaque identifier assigned by a web server to a specific version of a resource found at a URL.

pub e_tag: Option, - pub checksum_crc32: Option, - pub checksum_crc32_c: Option, - pub checksum_sha1: Option, - pub checksum_sha256: Option, + ///

This is set to the number of metadata entries not returned in the headers that are prefixed with x-amz-meta-. This can happen if you create metadata using an API like SOAP that supports more flexible metadata than the REST API. For example, using SOAP, you can create metadata whose values are not legal HTTP headers.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub missing_meta: Option, + ///

Version ID of the object.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub version_id: Option, + ///

Specifies caching behavior along the request/reply chain.

pub cache_control: Option, + ///

Specifies presentational information for the object.

pub content_disposition: Option, + ///

Indicates what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field.

pub content_encoding: Option, + ///

The language the content is in.

pub content_language: Option, - pub content_range: Option, + pub(crate) content_range: Option, + ///

A standard MIME type describing the format of the object data.

pub content_type: Option, - pub expires: Option<::aws_smithy_types::DateTime>, - pub expires_string: Option, + ///

If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub website_redirect_location: Option, + ///

The server-side encryption algorithm used when you store this object in Amazon S3.

pub server_side_encryption: Option, + ///

A map of metadata to store with the object in S3.

pub metadata: Option<::std::collections::HashMap>, + ///

If server-side encryption with a customer-provided encryption key was requested, the response will include this header to confirm the encryption algorithm that's used.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub sse_customer_algorithm: Option, + ///

If server-side encryption with a customer-provided encryption key was requested, the response will include this header to provide the round-trip message integrity verification of the customer-provided encryption key.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub sse_customer_key_md5: Option, + ///

If present, indicates the ID of the KMS key that was used for object encryption.

pub ssekms_key_id: Option, + ///

Indicates whether the object uses an S3 Bucket Key for server-side encryption with Key Management Service (KMS) keys (SSE-KMS).

pub bucket_key_enabled: Option, + ///

Provides storage class information of the object. Amazon S3 returns this header for all objects except for S3 Standard storage class objects.

+ ///

Directory buckets - Only the S3 Express One Zone storage class is supported by directory buckets to store objects.

+ ///
pub storage_class: Option, + ///

If present, indicates that the requester was successfully charged for the request.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub request_charged: Option, + ///

Amazon S3 can return this if your request involves a bucket that is either a source or destination in a replication rule.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub replication_status: Option, + ///

The count of parts this object has. This value is only returned if you specify partNumber in your request and the object was uploaded as a multipart upload.

pub parts_count: Option, - pub tag_count: Option, + ///

The Object Lock mode that's currently in place for this object.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub object_lock_mode: Option, + ///

The date and time when this object's Object Lock will expire.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub object_lock_retain_until_date: Option<::aws_smithy_types::DateTime>, + ///

Indicates whether this object has an active legal hold. This field is only returned if you have permission to view an object's legal hold status.

+ ///

This functionality is not supported for directory buckets.

+ ///
pub object_lock_legal_hold_status: Option, + ///

The date and time at which the object is no longer cacheable.

+ pub expires_string: Option, } impl ObjectMetadata { - /// The total object size - pub fn total_size(&self) -> u64 { + ///

Size of the object in bytes.

+ pub fn content_length(&self) -> u64 { match (self.content_length, self.content_range.as_ref()) { (_, Some(range)) => { let total = range.split_once('/').map(|x| x.1).expect("content range total"); @@ -69,72 +122,45 @@ impl ObjectMetadata { (None, None) => panic!("total object size cannot be calculated without either content length or content range headers") } } - - /// Content length from either the `Content-Range` or `Content-Length` headers - pub(crate) fn content_length(&self) -> u64 { - match (self.content_length, self.content_range.as_ref()) { - (Some(length), _) => { - debug_assert!(length > 0, "content length invalid"); - length as u64 - }, - (None, Some(range)) => { - let byte_range_str = range - .strip_prefix("bytes ") - .expect("content range bytes-unit recognized") - .split_once('/') - .map(|x| x.0) - .expect("content range valid"); - - match header::ByteRange::from_str(byte_range_str).expect("valid byte range") { - header::ByteRange::Inclusive(start, end) => end - start + 1, - _ => unreachable!("Content-Range header invalid") - } - } - (None, None) => panic!("total object size cannot be calculated without either content length or content range headers") - } - } } -impl From for ObjectMetadata { - fn from(value: GetObjectOutput) -> Self { +// The `GetObjectOutput` is used to create both `object_meta` and `chunk_meta`. +// We take a reference here instead of ownership because `GetObjectOutput` is not cloneable +// due to containing the body. +impl From<&GetObjectOutput> for ObjectMetadata { + fn from(value: &GetObjectOutput) -> Self { Self { + _request_id: value.request_id().map(|s| s.to_string()), + _extended_request_id: value.extended_request_id().map(|s| s.to_string()), delete_marker: value.delete_marker, - accept_ranges: value.accept_ranges, - expiration: value.expiration, - restore: value.restore, + expiration: value.expiration.clone(), + restore: value.restore.clone(), last_modified: value.last_modified, content_length: value.content_length, - e_tag: value.e_tag, - checksum_crc32: value.checksum_crc32, - checksum_crc32_c: value.checksum_crc32_c, - checksum_sha1: value.checksum_sha1, - checksum_sha256: value.checksum_sha256, + e_tag: value.e_tag.clone(), missing_meta: value.missing_meta, - version_id: value.version_id, - cache_control: value.cache_control, - content_disposition: value.content_disposition, - content_encoding: value.content_encoding, - content_language: value.content_language, - content_range: value.content_range, - content_type: value.content_type, - #[allow(deprecated)] - expires: value.expires, - expires_string: value.expires_string, - website_redirect_location: value.website_redirect_location, - server_side_encryption: value.server_side_encryption, - metadata: value.metadata, - sse_customer_algorithm: value.sse_customer_algorithm, - sse_customer_key_md5: value.sse_customer_key_md5, - ssekms_key_id: value.ssekms_key_id, + version_id: value.version_id.clone(), + cache_control: value.cache_control.clone(), + content_disposition: value.content_disposition.clone(), + content_encoding: value.content_encoding.clone(), + content_language: value.content_language.clone(), + content_range: value.content_range.clone(), + content_type: value.content_type.clone(), + expires_string: value.expires_string.clone(), + website_redirect_location: value.website_redirect_location.clone(), + server_side_encryption: value.server_side_encryption.clone(), + metadata: value.metadata.clone(), + sse_customer_algorithm: value.sse_customer_algorithm.clone(), + sse_customer_key_md5: value.sse_customer_key_md5.clone(), + ssekms_key_id: value.ssekms_key_id.clone(), bucket_key_enabled: value.bucket_key_enabled, - storage_class: value.storage_class, - request_charged: value.request_charged, - replication_status: value.replication_status, + storage_class: value.storage_class.clone(), + replication_status: value.replication_status.clone(), parts_count: value.parts_count, - tag_count: value.tag_count, - object_lock_mode: value.object_lock_mode, + object_lock_mode: value.object_lock_mode.clone(), object_lock_retain_until_date: value.object_lock_retain_until_date, - object_lock_legal_hold_status: value.object_lock_legal_hold_status, + object_lock_legal_hold_status: value.object_lock_legal_hold_status.clone(), + request_charged: value.request_charged.clone(), } } } @@ -142,17 +168,14 @@ impl From for ObjectMetadata { impl From for ObjectMetadata { fn from(value: HeadObjectOutput) -> Self { Self { + _request_id: value.request_id().map(|s| s.to_string()), + _extended_request_id: value.extended_request_id().map(|s| s.to_string()), delete_marker: value.delete_marker, - accept_ranges: value.accept_ranges, expiration: value.expiration, restore: value.restore, last_modified: value.last_modified, content_length: value.content_length, e_tag: value.e_tag, - checksum_crc32: value.checksum_crc32, - checksum_crc32_c: value.checksum_crc32_c, - checksum_sha1: value.checksum_sha1, - checksum_sha256: value.checksum_sha256, missing_meta: value.missing_meta, version_id: value.version_id, cache_control: value.cache_control, @@ -161,8 +184,6 @@ impl From for ObjectMetadata { content_language: value.content_language, content_range: None, content_type: value.content_type, - #[allow(deprecated)] - expires: value.expires, expires_string: value.expires_string, website_redirect_location: value.website_redirect_location, server_side_encryption: value.server_side_encryption, @@ -172,36 +193,89 @@ impl From for ObjectMetadata { ssekms_key_id: value.ssekms_key_id, bucket_key_enabled: value.bucket_key_enabled, storage_class: value.storage_class, - request_charged: value.request_charged, replication_status: value.replication_status, parts_count: value.parts_count, - tag_count: None, object_lock_mode: value.object_lock_mode, object_lock_retain_until_date: value.object_lock_retain_until_date, object_lock_legal_hold_status: value.object_lock_legal_hold_status, + request_charged: value.request_charged, } } } +impl RequestIdExt for ObjectMetadata { + fn extended_request_id(&self) -> Option<&str> { + self._extended_request_id.as_deref() + } +} +impl RequestId for ObjectMetadata { + fn request_id(&self) -> Option<&str> { + self._request_id.as_deref() + } +} + +impl ::std::fmt::Debug for ObjectMetadata { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + let mut formatter = f.debug_struct("ObjectMetadata"); + formatter.field("delete_marker", &self.delete_marker); + formatter.field("expiration", &self.expiration); + formatter.field("restore", &self.restore); + formatter.field("last_modified", &self.last_modified); + formatter.field("content_length", &self.content_length); + formatter.field("e_tag", &self.e_tag); + formatter.field("missing_meta", &self.missing_meta); + formatter.field("version_id", &self.version_id); + formatter.field("cache_control", &self.cache_control); + formatter.field("content_disposition", &self.content_disposition); + formatter.field("content_encoding", &self.content_encoding); + formatter.field("content_language", &self.content_language); + formatter.field("content_range", &self.content_range); + formatter.field("content_type", &self.content_type); + formatter.field("website_redirect_location", &self.website_redirect_location); + formatter.field("server_side_encryption", &self.server_side_encryption); + formatter.field("metadata", &self.metadata); + formatter.field("sse_customer_algorithm", &self.sse_customer_algorithm); + formatter.field("sse_customer_key_md5", &self.sse_customer_key_md5); + formatter.field("ssekms_key_id", &"*** Sensitive Data Redacted ***"); + formatter.field("bucket_key_enabled", &self.bucket_key_enabled); + formatter.field("storage_class", &self.storage_class); + formatter.field("request_charged", &self.request_charged); + formatter.field("replication_status", &self.replication_status); + formatter.field("parts_count", &self.parts_count); + formatter.field("object_lock_mode", &self.object_lock_mode); + formatter.field( + "object_lock_retain_until_date", + &self.object_lock_retain_until_date, + ); + formatter.field( + "object_lock_legal_hold_status", + &self.object_lock_legal_hold_status, + ); + formatter.field("expires_string", &self.expires_string); + formatter.field("_extended_request_id", &self._extended_request_id); + formatter.field("_request_id", &self._request_id); + formatter.finish() + } +} + #[cfg(test)] mod tests { use super::ObjectMetadata; #[test] - fn test_inferred_content_length() { + fn test_inferred_total_size() { let meta = ObjectMetadata { - content_length: Some(4), - content_range: Some("should ignore".to_owned()), + content_length: Some(15), ..Default::default() }; - assert_eq!(4, meta.content_length()); + assert_eq!(15, meta.content_length()); let meta = ObjectMetadata { - content_length: None, content_range: Some("bytes 0-499/900".to_owned()), + content_length: Some(500), ..Default::default() }; - assert_eq!(500, meta.content_length()); + assert_eq!(900, meta.content_length()); } } diff --git a/aws-s3-transfer-manager/src/operation/download/service.rs b/aws-s3-transfer-manager/src/operation/download/service.rs index 2183b06..ef20736 100644 --- a/aws-s3-transfer-manager/src/operation/download/service.rs +++ b/aws-s3-transfer-manager/src/operation/download/service.rs @@ -4,19 +4,22 @@ */ use crate::error; use crate::http::header; +use crate::io::AggregatedBytes; use crate::middleware::limit::concurrency::ConcurrencyLimitLayer; use crate::middleware::retry; use crate::operation::download::DownloadContext; use aws_smithy_types::body::SdkBody; -use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream}; +use aws_smithy_types::byte_stream::ByteStream; use std::cmp; use std::mem; use std::ops::RangeInclusive; use tokio::sync::mpsc; +use tokio::task; use tower::{service_fn, Service, ServiceBuilder, ServiceExt}; use tracing::Instrument; -use super::{DownloadHandle, DownloadInput, DownloadInputBuilder}; +use super::body::ChunkOutput; +use super::{DownloadInput, DownloadInputBuilder}; /// Request/input type for our "chunk" service. #[derive(Debug, Clone)] @@ -42,7 +45,7 @@ fn next_chunk( /// handler (service fn) for a single chunk async fn download_chunk_handler( request: DownloadChunkRequest, -) -> Result { +) -> Result { let seq: u64 = request.ctx.next_seq(); // the rest of the work is in its own fn, so we can log `seq` in the tracing span @@ -54,7 +57,7 @@ async fn download_chunk_handler( async fn download_specific_chunk( request: DownloadChunkRequest, seq: u64, -) -> Result { +) -> Result { let ctx = request.ctx; let part_size = ctx.handle.download_part_size_bytes(); let input = next_chunk( @@ -73,26 +76,24 @@ async fn download_specific_chunk( .map_err(error::from_kind(error::ErrorKind::ChunkFailed))?; let body = mem::replace(&mut resp.body, ByteStream::new(SdkBody::taken())); - - let bytes = body - .collect() + let body = AggregatedBytes::from_byte_stream(body) .instrument(tracing::debug_span!( "collect-body-from-download-chunk", seq )) - .await - .map_err(error::from_kind(error::ErrorKind::ChunkFailed))?; + .await?; - Ok(ChunkResponse { + Ok(ChunkOutput { seq, - data: Some(bytes), + data: body, + metadata: resp.into(), }) } /// Create a new tower::Service for downloading individual chunks of an object from S3 pub(super) fn chunk_service( ctx: &DownloadContext, -) -> impl Service +) -> impl Service + Clone + Send { let svc = service_fn(download_chunk_handler); @@ -104,15 +105,6 @@ pub(super) fn chunk_service( .service(svc) } -#[derive(Debug, Clone)] -pub(crate) struct ChunkResponse { - // TODO(aws-sdk-rust#1159, design) - consider PartialOrd for ChunkResponse and hiding `seq` as internal only detail - // the seq number - pub(crate) seq: u64, - // chunk data - pub(crate) data: Option, -} - /// Spawn tasks to download the remaining chunks of object data /// /// # Arguments @@ -123,22 +115,23 @@ pub(crate) struct ChunkResponse { /// * start_seq - the starting sequence number to use for chunks /// * comp_tx - the channel to send chunk responses to pub(super) fn distribute_work( - handle: &mut DownloadHandle, + tasks: &mut task::JoinSet<()>, + ctx: DownloadContext, remaining: RangeInclusive, input: DownloadInput, start_seq: u64, - comp_tx: mpsc::Sender>, + comp_tx: mpsc::Sender>, parent_span_for_tasks: tracing::Span, ) { - let svc = chunk_service(&handle.ctx); - let part_size = handle.ctx.target_part_size_bytes(); + let svc = chunk_service(&ctx); + let part_size = ctx.target_part_size_bytes(); let input: DownloadInputBuilder = input.into(); let size = *remaining.end() - *remaining.start() + 1; let num_parts = size.div_ceil(part_size); for _ in 0..num_parts { let req = DownloadChunkRequest { - ctx: handle.ctx.clone(), + ctx: ctx.clone(), remaining: remaining.clone(), input: input.clone(), start_seq, @@ -148,14 +141,13 @@ pub(super) fn distribute_work( let comp_tx = comp_tx.clone(); let task = async move { + // TODO: If downloading a chunk fails, do we want to abort the download? let resp = svc.oneshot(req).await; if let Err(err) = comp_tx.send(resp).await { tracing::debug!(error = ?err, "chunk send failed, channel closed"); } }; - handle - .tasks - .spawn(task.instrument(parent_span_for_tasks.clone())); + tasks.spawn(task.instrument(parent_span_for_tasks.clone())); } tracing::trace!("work fully distributed"); diff --git a/aws-s3-transfer-manager/src/operation/download_objects/worker.rs b/aws-s3-transfer-manager/src/operation/download_objects/worker.rs index b198639..ec3882d 100644 --- a/aws-s3-transfer-manager/src/operation/download_objects/worker.rs +++ b/aws-s3-transfer-manager/src/operation/download_objects/worker.rs @@ -141,7 +141,8 @@ async fn download_single_obj( let key_path = local_key_path(root_dir, key.as_str(), prefix, delim)?; let mut handle = - crate::operation::download::Download::orchestrate(ctx.handle.clone(), input, true).await?; + crate::operation::download::Download::orchestrate(ctx.handle.clone(), input, true)?; + let _ = handle.object_meta().await?; let mut body = mem::replace(&mut handle.body, Body::empty()); let parent_dir = key_path.parent().expect("valid parent dir for key"); @@ -150,7 +151,7 @@ async fn download_single_obj( while let Some(chunk) = body.next().await { let chunk = chunk?; - for segment in chunk.into_segments() { + for segment in chunk.data.into_segments() { dest.write_all(segment.as_ref()).await?; } } diff --git a/aws-s3-transfer-manager/src/operation/upload/builders.rs b/aws-s3-transfer-manager/src/operation/upload/builders.rs index 5dab881..deebe71 100644 --- a/aws-s3-transfer-manager/src/operation/upload/builders.rs +++ b/aws-s3-transfer-manager/src/operation/upload/builders.rs @@ -29,6 +29,7 @@ impl UploadFluentBuilder { bucket = self.inner.bucket.as_deref().unwrap_or_default(), key = self.inner.key.as_deref().unwrap_or_default(), ))] + // TODO: Make it consistent with download by renaming it to initiate and making it synchronous pub async fn send(self) -> Result { let input = self.inner.build()?; crate::operation::upload::Upload::orchestrate(self.handle, input).await diff --git a/aws-s3-transfer-manager/tests/download_test.rs b/aws-s3-transfer-manager/tests/download_test.rs index 9e4790b..db69353 100644 --- a/aws-s3-transfer-manager/tests/download_test.rs +++ b/aws-s3-transfer-manager/tests/download_test.rs @@ -49,7 +49,7 @@ async fn drain(handle: &mut DownloadHandle) -> Result { let body = handle.body_mut(); let mut data = BytesMut::new(); while let Some(chunk) = body.next().await { - let chunk = chunk?.into_bytes(); + let chunk = chunk?.data.into_bytes(); data.put(chunk); } @@ -70,13 +70,14 @@ fn simple_object_connector(data: &Bytes, part_size: usize) -> StaticReplayClient .enumerate() .map(|(idx, chunk)| { let start = idx * part_size; - let end = part_size * (idx + 1) - 1; + let end = std::cmp::min(start + part_size, data.len()) - 1; ReplayEvent::new( // NOTE: Rather than try to recreate all the expected requests we just put in placeholders and // make our own assertions against the captured requests. dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", end - start + 1)) .header( "Content-Range", format!("bytes {start}-{end}/{}", data.len()), @@ -129,8 +130,7 @@ async fn test_download_ranges() { .download() .bucket("test-bucket") .key("test-object") - .send() - .await + .initiate() .unwrap(); let body = drain(&mut handle).await.unwrap(); @@ -164,8 +164,7 @@ async fn test_body_not_consumed() { .download() .bucket("test-bucket") .key("test-object") - .send() - .await + .initiate() .unwrap(); handle.join().await.unwrap(); @@ -229,6 +228,7 @@ async fn test_retry_failed_chunk() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", part_size)) .header( "Content-Range", format!("bytes 0-{}/{}", part_size - 1, data.len()), @@ -241,6 +241,7 @@ async fn test_retry_failed_chunk() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", data.len() - part_size)) .header( "Content-Range", format!("bytes {}-{}/{}", part_size, data.len(), data.len()), @@ -257,6 +258,7 @@ async fn test_retry_failed_chunk() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", data.len() - part_size)) .header( "Content-Range", format!("bytes {}-{}/{}", part_size, data.len(), data.len()), @@ -272,8 +274,7 @@ async fn test_retry_failed_chunk() { .download() .bucket("test-bucket") .key("test-object") - .send() - .await + .initiate() .unwrap(); let body = drain(&mut handle).await.unwrap(); @@ -304,6 +305,7 @@ async fn test_non_retryable_error() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", part_size)) .header( "Content-Range", format!("bytes 0-{}/{}", part_size - 1, data.len()), @@ -327,8 +329,7 @@ async fn test_non_retryable_error() { .download() .bucket("test-bucket") .key("test-object") - .send() - .await + .initiate() .unwrap(); let _ = drain(&mut handle).await.unwrap_err(); @@ -351,6 +352,7 @@ async fn test_retry_max_attempts() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", part_size)) .header( "Content-Range", format!("bytes {}-{}/{}", part_size, data.len(), data.len()), @@ -370,6 +372,7 @@ async fn test_retry_max_attempts() { dummy_expected_request(), http_02x::Response::builder() .status(200) + .header("Content-Length", format!("{}", part_size)) .header( "Content-Range", format!("bytes 0-{}/{}", part_size - 1, data.len()), @@ -387,8 +390,7 @@ async fn test_retry_max_attempts() { .download() .bucket("test-bucket") .key("test-object") - .send() - .await + .initiate() .unwrap(); let _ = drain(&mut handle).await.unwrap_err();