From 488096d50f797d7d019323240b7129ff7b038bda Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Tue, 6 Aug 2024 10:25:21 -0400 Subject: [PATCH 1/6] refactor errors --- aws-s3-transfer-manager/Cargo.toml | 1 - aws-s3-transfer-manager/src/client.rs | 4 +- aws-s3-transfer-manager/src/error.rs | 244 ++++++++++-------- .../src/operation/download.rs | 5 +- .../src/operation/download/body.rs | 14 +- .../src/operation/download/builders.rs | 9 +- .../src/operation/download/discovery.rs | 21 +- .../src/operation/download/header.rs | 18 +- .../src/operation/download/worker.rs | 9 +- .../src/operation/download_objects.rs | 7 +- .../operation/download_objects/builders.rs | 6 +- .../src/operation/download_objects/handle.rs | 4 +- .../src/operation/upload.rs | 10 +- .../src/operation/upload/builders.rs | 9 +- .../src/operation/upload/handle.rs | 11 +- aws-s3-transfer-manager/src/types.rs | 6 +- 16 files changed, 202 insertions(+), 176 deletions(-) diff --git a/aws-s3-transfer-manager/Cargo.toml b/aws-s3-transfer-manager/Cargo.toml index 7bdcf51..8962c11 100644 --- a/aws-s3-transfer-manager/Cargo.toml +++ b/aws-s3-transfer-manager/Cargo.toml @@ -19,7 +19,6 @@ aws-types = "1.3.3" bytes = "1" # FIXME - upgrade to hyper 1.x hyper = { version = "0.14.29", features = ["client"] } -thiserror = "1.0.61" tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] } tracing = "0.1" diff --git a/aws-s3-transfer-manager/src/client.rs b/aws-s3-transfer-manager/src/client.rs index b01e2c9..a5addee 100644 --- a/aws-s3-transfer-manager/src/client.rs +++ b/aws-s3-transfer-manager/src/client.rs @@ -148,12 +148,12 @@ impl Client { /// # Examples /// ```no_run /// use std::path::PathBuf; - /// use aws_s3_transfer_manager::operation::download_objects::DownloadObjectsError; + /// use aws_s3_transfer_manager::error::Error; /// /// async fn download_bucket( /// client: &aws_s3_transfer_manager::Client, /// dest: PathBuf - /// ) -> Result<(), DownloadObjectsError> { + /// ) -> Result<(), Error> { /// /// let handle = client /// .download_objects() diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index a824728..5ce69c0 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -3,127 +3,165 @@ * SPDX-License-Identifier: Apache-2.0 */ -use aws_smithy_types::byte_stream; -use std::io; +use std::fmt; -// TODO(design): revisit errors +/// A boxed error that is `Send` and `Sync`. +pub type BoxError = Box; -/// Failed transfer result -#[derive(thiserror::Error, Debug)] -pub enum TransferError { - /// The request was invalid - #[error("invalid meta request: {0}")] - InvalidMetaRequest(String), +use aws_sdk_s3::error::ProvideErrorMetadata; - /// A download failed - #[error("download failed")] - DownloadFailed(#[from] DownloadError), +/// Errors returned by this library +/// +/// NOTE: Use [`aws_smithy_types::error::display::DisplayErrorContext`] or similar to display +/// the entire error cause/source chain. +#[derive(Debug)] +pub struct Error { + kind: ErrorKind, + source: BoxError, +} + +/// General categories of transfer errors. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum ErrorKind { + /// Operation input validation issues + InputInvalid, + + /// I/O errors + IOError, + + /// Some kind of internal runtime issue (e.g. task failure, poisoned mutex, etc) + RuntimeError, + + /// Object discovery failed + ObjectNotDiscoverable, + + /// Failed to upload or download a chunk of an object + ChunkFailed, + + /// Resource not found (e.g. bucket, key, multipart upload ID not found) + NotFound, + + /// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket) + ChildOperationFailed, + + /// A custom error that does not fall under any other error kind + Other, +} + +impl Error { + /// Creates a new transfer [`Error`] from a known kind of error as well as an arbitrary error + /// source. + pub fn new(kind: ErrorKind, err: E) -> Error + where + E: Into, + { + Error { + kind, + source: err.into(), + } + } + + /// Creates a new transfer [`Error`] from an arbitrary payload. + /// + /// This function is a shortcut for [`Error::new`] with [`ErrorKind::Other`] + pub fn other(err: E) -> Error + where + E: Into, + { + Error::new(ErrorKind::Other, err) + } - /// A upload failed - #[error("upload failed")] - UploadFailed(#[from] UploadError), + /// Returns the corresponding [`ErrorKind`] for this error. + pub fn kind(&self) -> ErrorKind { + self.kind.clone() + } } -pub(crate) type GetObjectSdkError = ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::get_object::GetObjectError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, ->; -pub(crate) type HeadObjectSdkError = ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::head_object::HeadObjectError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, ->; - -/// An error related to downloading an object -#[derive(thiserror::Error, Debug)] -pub enum DownloadError { - /// Discovery of object metadata failed - #[error(transparent)] - DiscoverFailed(SdkOperationError), - - /// A failure occurred fetching a single chunk of the overall object data - #[error("download chunk failed")] - ChunkFailed { - /// The underlying SDK error - source: SdkOperationError, - }, +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.kind { + ErrorKind::InputInvalid => write!(f, "invalid input"), + ErrorKind::IOError => write!(f, "I/O error"), + ErrorKind::RuntimeError => write!(f, "runtime error"), + ErrorKind::ObjectNotDiscoverable => write!(f, "object discovery failed"), + ErrorKind::ChunkFailed => write!(f, "failed to process chunk"), + ErrorKind::NotFound => write!(f, "resource not found"), + ErrorKind::ChildOperationFailed => write!(f, "child operation failed"), + ErrorKind::Other => write!(f, "unknown error"), + } + } } -pub(crate) type CreateMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, ->; - -pub(crate) type UploadPartSdkError = ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::upload_part::UploadPartError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, ->; - -pub(crate) type CompleteMultipartUploadSdkError = - ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, - >; - -pub(crate) type AbortMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError< - aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError, - ::aws_smithy_runtime_api::client::orchestrator::HttpResponse, ->; - -/// An error related to upload an object -#[derive(thiserror::Error, Debug)] -pub enum UploadError { - /// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload] - #[error(transparent)] - CreateMultipartUpload(#[from] CreateMultipartUploadSdkError), - - /// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload] - #[error(transparent)] - CompleteMultipartUpload(#[from] CompleteMultipartUploadSdkError), - - /// An error occurred invoking [aws_sdk_s3::Client::UploadPart] - #[error(transparent)] - UploadPart(#[from] UploadPartSdkError), - - /// An error occurred invoking [aws_sdk_s3::Client::AbortMultipartUpload] - #[error(transparent)] - AbortMultipartUpload(#[from] AbortMultipartUploadSdkError), - - /// An I/O error occurred - #[error(transparent)] - IOError(#[from] crate::io::error::Error), +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(self.source.as_ref()) + } } -/// An underlying S3 SDK error -#[derive(thiserror::Error, Debug)] -pub enum SdkOperationError { - /// An error occurred invoking [aws_sdk_s3::Client::head_object] - #[error(transparent)] - HeadObject(#[from] HeadObjectSdkError), +impl From for Error { + fn from(value: crate::io::error::Error) -> Self { + Self::new(ErrorKind::IOError, value) + } +} - /// An error occurred invoking [aws_sdk_s3::Client::get_object] - #[error(transparent)] - GetObject(#[from] GetObjectSdkError), +impl From for Error { + fn from(value: std::io::Error) -> Self { + Self::new(ErrorKind::IOError, value) + } +} + +impl From for Error { + fn from(value: tokio::task::JoinError) -> Self { + Self::new(ErrorKind::RuntimeError, value) + } +} - /// An error occurred reading the underlying data - #[error(transparent)] - ReadError(#[from] byte_stream::error::Error), +impl From> for Error +where + T: Send + Sync + 'static, +{ + fn from(value: std::sync::PoisonError) -> Self { + Self::new(ErrorKind::RuntimeError, value) + } +} - /// An unknown IO error occurred carrying out the request - #[error(transparent)] - IoError(#[from] io::Error), +pub(crate) fn invalid_input(err: E) -> Error +where + E: Into, +{ + Error::new(ErrorKind::InputInvalid, err) } -// convenience to construct a TransferError from a chunk failure -pub(crate) fn chunk_failed>(e: E) -> TransferError { - DownloadError::ChunkFailed { source: e.into() }.into() +pub(crate) fn discovery_failed(err: E) -> Error +where + E: Into, +{ + Error::new(ErrorKind::ObjectNotDiscoverable, err) } -pub(crate) fn invalid_meta_request(message: String) -> TransferError { - TransferError::InvalidMetaRequest(message) +pub(crate) fn from_kind(kind: ErrorKind) -> impl FnOnce(E) -> Error +where + E: Into, +{ + |err| Error::new(kind, err) } -impl From for TransferError { - fn from(value: CreateMultipartUploadSdkError) -> Self { - TransferError::UploadFailed(value.into()) +impl From> for Error +where + E: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static, + R: Send + Sync + fmt::Debug + 'static, +{ + fn from(value: aws_sdk_s3::error::SdkError) -> Self { + // TODO - extract request id/metadata + let kind = match value.code() { + Some("NotFound") | Some("NoSuchKey") | Some("NoSuchUpload") | Some("NoSuchBucket") => { + ErrorKind::NotFound + } + // TODO - is this the rigth error kind? do we need something else? + _ => ErrorKind::ChildOperationFailed, + }; + + Error::new(kind, value) } } diff --git a/aws-s3-transfer-manager/src/operation/download.rs b/aws-s3-transfer-manager/src/operation/download.rs index 043e0bf..5837698 100644 --- a/aws-s3-transfer-manager/src/operation/download.rs +++ b/aws-s3-transfer-manager/src/operation/download.rs @@ -23,7 +23,6 @@ mod header; mod object_meta; mod worker; -use crate::error::TransferError; use body::Body; use context::DownloadContext; use discovery::{discover_obj, ObjectDiscovery}; @@ -42,7 +41,7 @@ impl Download { pub(crate) async fn orchestrate( handle: Arc, input: crate::operation::download::DownloadInput, - ) -> Result { + ) -> Result { // if there is a part number then just send the default request if input.part_number().is_some() { todo!("single part download not implemented") @@ -109,7 +108,7 @@ impl Download { /// the starting sequence number to use for remaining chunks. async fn handle_discovery_chunk( discovery: &mut ObjectDiscovery, - completed: &mpsc::Sender>, + completed: &mpsc::Sender>, ) -> u64 { let mut start_seq = 0; if let Some(initial_data) = discovery.initial_chunk.take() { diff --git a/aws-s3-transfer-manager/src/operation/download/body.rs b/aws-s3-transfer-manager/src/operation/download/body.rs index 10f7414..9a8d45d 100644 --- a/aws-s3-transfer-manager/src/operation/download/body.rs +++ b/aws-s3-transfer-manager/src/operation/download/body.rs @@ -2,7 +2,6 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ -use crate::error::TransferError; use crate::operation::download::worker::ChunkResponse; use aws_smithy_types::byte_stream::AggregatedBytes; use std::cmp; @@ -20,7 +19,7 @@ pub struct Body { sequencer: Sequencer, } -type BodyChannel = mpsc::Receiver>; +type BodyChannel = mpsc::Receiver>; impl Body { /// Create a new empty Body @@ -51,7 +50,7 @@ impl Body { /// Returns [None] when there is no more data. /// Chunks returned from a [Body] are guaranteed to be sequenced /// in the right order. - pub async fn next(&mut self) -> Option> { + pub async fn next(&mut self) -> Option> { // TODO(aws-sdk-rust#1159, design) - do we want ChunkResponse (or similar) rather than AggregatedBytes? Would // make additional retries of an individual chunk/part more feasible (though theoretically already exhausted retries) loop { @@ -146,7 +145,7 @@ impl PartialEq for SequencedChunk { /// A body that returns chunks in whatever order they are received. #[derive(Debug)] pub(crate) struct UnorderedBody { - chunks: Option>>, + chunks: Option>>, } impl UnorderedBody { @@ -160,7 +159,7 @@ impl UnorderedBody { /// Chunks returned from an [UnorderedBody] are not guaranteed to be sorted /// in the right order. Consumers are expected to sort the data themselves /// using the chunk sequence number (starting from zero). - pub(crate) async fn next(&mut self) -> Option> { + pub(crate) async fn next(&mut self) -> Option> { match self.chunks.as_mut() { None => None, Some(ch) => ch.recv().await, @@ -170,8 +169,7 @@ impl UnorderedBody { #[cfg(test)] mod tests { - use crate::error::TransferError; - use crate::operation::download::worker::ChunkResponse; + use crate::{error, operation::download::worker::ChunkResponse}; use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream}; use bytes::Bytes; use tokio::sync::mpsc; @@ -226,7 +224,7 @@ mod tests { let aggregated = ByteStream::from(data).collect().await.unwrap(); let chunk = chunk_resp(0, Some(aggregated)); tx.send(Ok(chunk)).await.unwrap(); - let err = TransferError::InvalidMetaRequest("test errors".to_string()); + let err = error::Error::new(error::ErrorKind::InputInvalid, "test errors".to_string()); tx.send(Err(err)).await.unwrap(); }); diff --git a/aws-s3-transfer-manager/src/operation/download/builders.rs b/aws-s3-transfer-manager/src/operation/download/builders.rs index c33aa13..48be4b9 100644 --- a/aws-s3-transfer-manager/src/operation/download/builders.rs +++ b/aws-s3-transfer-manager/src/operation/download/builders.rs @@ -4,8 +4,6 @@ */ use std::sync::Arc; -use crate::error::TransferError; - use super::{DownloadHandle, DownloadInputBuilder}; /// Fluent builder for constructing a single object upload transfer @@ -24,7 +22,7 @@ impl DownloadFluentBuilder { } /// Initiate a download transfer for a single object - pub async fn send(self) -> Result { + pub async fn send(self) -> Result { // FIXME - need DownloadError to support this conversion to remove expect() in favor of ? let input = self.inner.build().expect("valid input"); crate::operation::download::Download::orchestrate(self.handle, input).await @@ -523,7 +521,10 @@ impl DownloadFluentBuilder { impl crate::operation::download::input::DownloadInputBuilder { /// Initiate a download transfer for a single object with this input using the given client. - pub async fn send_with(self, client: &crate::Client) -> Result { + pub async fn send_with( + self, + client: &crate::Client, + ) -> Result { let mut fluent_builder = client.download(); fluent_builder.inner = self; fluent_builder.send().await diff --git a/aws-s3-transfer-manager/src/operation/download/discovery.rs b/aws-s3-transfer-manager/src/operation/download/discovery.rs index 3da1e96..52441aa 100644 --- a/aws-s3-transfer-manager/src/operation/download/discovery.rs +++ b/aws-s3-transfer-manager/src/operation/download/discovery.rs @@ -10,9 +10,9 @@ use std::{cmp, mem}; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream}; -use bytes::Buf; use crate::error; +use bytes::Buf; use super::header::{self, ByteRange}; use super::object_meta::ObjectMetadata; @@ -43,9 +43,7 @@ pub(super) struct ObjectDiscovery { } impl ObjectDiscoveryStrategy { - fn from_request( - input: &DownloadInput, - ) -> Result { + fn from_request(input: &DownloadInput) -> Result { let strategy = match input.range() { Some(h) => { let byte_range = header::Range::from_str(h)?.0; @@ -72,7 +70,7 @@ impl ObjectDiscoveryStrategy { pub(super) async fn discover_obj( ctx: &DownloadContext, input: &DownloadInput, -) -> Result { +) -> Result { let strategy = ObjectDiscoveryStrategy::from_request(input)?; match strategy { ObjectDiscoveryStrategy::HeadObject(byte_range) => { @@ -100,7 +98,7 @@ async fn discover_obj_with_head( ctx: &DownloadContext, input: &DownloadInput, byte_range: Option, -) -> Result { +) -> Result { let meta: ObjectMetadata = ctx .client() .head_object() @@ -108,7 +106,7 @@ async fn discover_obj_with_head( .set_key(input.key().map(str::to_string)) .send() .await - .map_err(|e| error::DownloadError::DiscoverFailed(e.into()))? + .map_err(error::discovery_failed)? .into(); let remaining = match byte_range { @@ -131,23 +129,20 @@ async fn discover_obj_with_get( ctx: &DownloadContext, input: GetObjectInputBuilder, range: Option>, -) -> Result { +) -> Result { let resp = input.send_with(ctx.client()).await; if resp.is_err() { // TODO(aws-sdk-rust#1159) - deal with empty file errors, see https://github.com/awslabs/aws-c-s3/blob/v0.5.7/source/s3_auto_ranged_get.c#L147-L153 } - let mut resp = resp.map_err(|e| error::DownloadError::DiscoverFailed(e.into()))?; + let mut resp = resp.map_err(error::discovery_failed)?; // take the body so we can convert the metadata let empty_stream = ByteStream::new(SdkBody::empty()); let body = mem::replace(&mut resp.body, empty_stream); - let data = body - .collect() - .await - .map_err(|e| error::DownloadError::DiscoverFailed(e.into()))?; + let data = body.collect().await.map_err(error::discovery_failed)?; let meta: ObjectMetadata = resp.into(); diff --git a/aws-s3-transfer-manager/src/operation/download/header.rs b/aws-s3-transfer-manager/src/operation/download/header.rs index 53c41f2..5d0dd58 100644 --- a/aws-s3-transfer-manager/src/operation/download/header.rs +++ b/aws-s3-transfer-manager/src/operation/download/header.rs @@ -38,7 +38,7 @@ impl From for String { } impl FromStr for Range { - type Err = error::TransferError; + type Err = error::Error; fn from_str(s: &str) -> Result { let mut iter = s.splitn(2, '='); @@ -46,18 +46,18 @@ impl FromStr for Range { (Some("bytes"), Some(range)) => { if range.contains(',') { // TODO(aws-sdk-rust#1159) - error S3 doesn't support multiple byte ranges - Err(error::invalid_meta_request(format!( + Err(error::invalid_input(format!( "multiple byte ranges not supported for range header {}", s ))) } else { let spec = ByteRange::from_str(range).map_err(|_| { - error::invalid_meta_request(format!("invalid range header {}", s)) + error::invalid_input(format!("invalid range header {}", s)) })?; Ok(Range(spec)) } } - _ => Err(error::invalid_meta_request(format!( + _ => Err(error::invalid_input(format!( "unsupported byte range header format `{s}`; see https://www.rfc-editor.org/rfc/rfc9110.html#name-range for valid formats" ))), } @@ -107,7 +107,8 @@ impl FromStr for ByteRange { #[cfg(test)] mod tests { use super::{ByteRange, Range}; - use crate::error::TransferError; + use crate::error; + use aws_smithy_types::error::display::DisplayErrorContext; use std::str::FromStr; #[test] @@ -126,10 +127,11 @@ mod tests { ); } - fn assert_err_contains(r: Result, msg: &str) { + fn assert_err_contains(r: Result, msg: &str) { let err = r.unwrap_err(); - match err { - TransferError::InvalidMetaRequest(m) => { + match err.kind() { + error::ErrorKind::InputInvalid => { + let m = DisplayErrorContext(err).to_string(); assert!(m.contains(msg), "'{}' does not contain '{}'", m, msg); } _ => panic!("unexpected error type"), diff --git a/aws-s3-transfer-manager/src/operation/download/worker.rs b/aws-s3-transfer-manager/src/operation/download/worker.rs index 24b5583..79ecf56 100644 --- a/aws-s3-transfer-manager/src/operation/download/worker.rs +++ b/aws-s3-transfer-manager/src/operation/download/worker.rs @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ use crate::error; -use crate::error::TransferError; use crate::operation::download::context::DownloadContext; use crate::operation::download::header; use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder; @@ -45,7 +44,7 @@ pub(crate) struct ChunkResponse { pub(super) async fn download_chunks( ctx: DownloadContext, requests: async_channel::Receiver, - completed: mpsc::Sender>, + completed: mpsc::Sender>, ) { while let Ok(request) = requests.recv().await { let seq = request.seq; @@ -68,12 +67,12 @@ pub(super) async fn download_chunks( async fn download_chunk( ctx: &DownloadContext, request: ChunkRequest, -) -> Result { +) -> Result { let mut resp = request .input .send_with(ctx.client()) .await - .map_err(error::chunk_failed)?; + .map_err(error::from_kind(error::ErrorKind::ChunkFailed))?; let body = mem::replace(&mut resp.body, ByteStream::new(SdkBody::taken())); @@ -81,7 +80,7 @@ async fn download_chunk( .collect() .instrument(tracing::debug_span!("collect-body", seq = request.seq)) .await - .map_err(error::chunk_failed)?; + .map_err(error::from_kind(error::ErrorKind::ChunkFailed))?; Ok(ChunkResponse { seq: request.seq, diff --git a/aws-s3-transfer-manager/src/operation/download_objects.rs b/aws-s3-transfer-manager/src/operation/download_objects.rs index 7e38859..4fdaee5 100644 --- a/aws-s3-transfer-manager/src/operation/download_objects.rs +++ b/aws-s3-transfer-manager/src/operation/download_objects.rs @@ -27,12 +27,7 @@ impl DownloadObjects { pub(crate) async fn orchestrate( _handle: Arc, _input: crate::operation::download_objects::DownloadObjectsInput, - ) -> Result { + ) -> Result { unimplemented!() } } - -/// Error type for `DownloadObjects` operation -#[non_exhaustive] -#[derive(Debug)] -pub enum DownloadObjectsError {} diff --git a/aws-s3-transfer-manager/src/operation/download_objects/builders.rs b/aws-s3-transfer-manager/src/operation/download_objects/builders.rs index f457fe7..af7b1c3 100644 --- a/aws-s3-transfer-manager/src/operation/download_objects/builders.rs +++ b/aws-s3-transfer-manager/src/operation/download_objects/builders.rs @@ -6,7 +6,7 @@ use crate::types::{DownloadFilter, FailedTransferPolicy}; use std::{path::PathBuf, sync::Arc}; -use super::{DownloadObjectsError, DownloadObjectsHandle, DownloadObjectsInputBuilder}; +use super::{DownloadObjectsHandle, DownloadObjectsInputBuilder}; /// Fluent builder for constructing a multiple object download transfer #[derive(Debug)] @@ -24,7 +24,7 @@ impl DownloadObjectsFluentBuilder { } /// Initiate a download transfer for multiple objects - pub async fn send(self) -> Result { + pub async fn send(self) -> Result { // FIXME - need DownloadObjectsError to support this conversion to remove expect() in favor of ? let input = self.inner.build().expect("valid input"); crate::operation::download_objects::DownloadObjects::orchestrate(self.handle, input).await @@ -132,7 +132,7 @@ impl crate::operation::download_objects::input::DownloadObjectsInputBuilder { pub async fn send_with( self, client: &crate::Client, - ) -> Result { + ) -> Result { let mut fluent_builder = client.download_objects(); fluent_builder.inner = self; fluent_builder.send().await diff --git a/aws-s3-transfer-manager/src/operation/download_objects/handle.rs b/aws-s3-transfer-manager/src/operation/download_objects/handle.rs index d029829..9024cdc 100644 --- a/aws-s3-transfer-manager/src/operation/download_objects/handle.rs +++ b/aws-s3-transfer-manager/src/operation/download_objects/handle.rs @@ -5,7 +5,7 @@ use tokio::task; -use super::{DownloadObjectsError, DownloadObjectsOutput}; +use super::DownloadObjectsOutput; /// Handle for `DownloadObjects` transfer operation #[derive(Debug)] @@ -17,7 +17,7 @@ pub struct DownloadObjectsHandle { impl DownloadObjectsHandle { /// Consume the handle and wait for download transfer to complete - pub async fn join(self) -> Result { + pub async fn join(self) -> Result { unimplemented!() } } diff --git a/aws-s3-transfer-manager/src/operation/upload.rs b/aws-s3-transfer-manager/src/operation/upload.rs index 28b0c56..db7b1a1 100644 --- a/aws-s3-transfer-manager/src/operation/upload.rs +++ b/aws-s3-transfer-manager/src/operation/upload.rs @@ -11,7 +11,7 @@ mod output; mod context; mod handle; -use crate::error::UploadError; +use crate::error; use crate::io::part_reader::{Builder as PartReaderBuilder, ReadPart}; use crate::io::InputStream; use aws_sdk_s3::primitives::ByteStream; @@ -39,7 +39,7 @@ impl Upload { pub(crate) async fn orchestrate( handle: Arc, mut input: crate::operation::upload::UploadInput, - ) -> Result { + ) -> Result { let min_mpu_threshold = handle.mpu_threshold_bytes(); let stream = input.take_body(); @@ -75,7 +75,7 @@ async fn try_start_mpu_upload( handle: &mut UploadHandle, stream: InputStream, content_length: u64, -) -> Result<(), UploadError> { +) -> Result<(), crate::error::Error> { let part_size = cmp::max( handle.ctx.handle.upload_part_size_bytes(), content_length / MAX_PARTS, @@ -116,7 +116,7 @@ fn new_context(handle: Arc, req: UploadInput) -> UploadCo } /// start a new multipart upload by invoking `CreateMultipartUpload` -async fn start_mpu(handle: &UploadHandle) -> Result { +async fn start_mpu(handle: &UploadHandle) -> Result { let req = handle.ctx.request(); let client = handle.ctx.client(); @@ -164,7 +164,7 @@ async fn start_mpu(handle: &UploadHandle) -> Result, -) -> Result, UploadError> { +) -> Result, error::Error> { let mut completed_parts = Vec::new(); loop { let part_result = reader.next_part().await?; diff --git a/aws-s3-transfer-manager/src/operation/upload/builders.rs b/aws-s3-transfer-manager/src/operation/upload/builders.rs index 6ba7e30..bde7099 100644 --- a/aws-s3-transfer-manager/src/operation/upload/builders.rs +++ b/aws-s3-transfer-manager/src/operation/upload/builders.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use crate::{error::UploadError, types::FailedMultipartUploadPolicy}; +use crate::types::FailedMultipartUploadPolicy; use super::{UploadHandle, UploadInputBuilder}; @@ -25,7 +25,7 @@ impl UploadFluentBuilder { } /// Initiate an upload transfer for a single object - pub async fn send(self) -> Result { + pub async fn send(self) -> Result { // FIXME - need UploadError to support this conversion to remove expect() in favor of ? let input = self.inner.build().expect("valid input"); crate::operation::upload::Upload::orchestrate(self.handle, input).await @@ -907,7 +907,10 @@ impl UploadFluentBuilder { impl crate::operation::upload::input::UploadInputBuilder { /// Initiate an upload transfer for a single object with this input using the given client. - pub async fn send_with(self, client: &crate::Client) -> Result { + pub async fn send_with( + self, + client: &crate::Client, + ) -> Result { let mut fluent_builder = client.upload(); fluent_builder.inner = self; fluent_builder.send().await diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 1d9c583..50249d9 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::error::UploadError; use crate::operation::upload::context::UploadContext; use crate::operation::upload::{UploadOutput, UploadOutputBuilder}; use crate::types::{AbortedUpload, FailedMultipartUploadPolicy}; @@ -15,7 +14,7 @@ use tokio::task; #[non_exhaustive] pub struct UploadHandle { /// All child multipart upload tasks spawned for this upload - pub(crate) tasks: task::JoinSet, UploadError>>, + pub(crate) tasks: task::JoinSet, crate::error::Error>>, /// The context used to drive an upload to completion pub(crate) ctx: UploadContext, /// The response that will eventually be yielded to the caller. @@ -46,12 +45,12 @@ impl UploadHandle { } /// Consume the handle and wait for upload to complete - pub async fn join(self) -> Result { + pub async fn join(self) -> Result { complete_upload(self).await } /// Abort the upload and cancel any in-progress part uploads. - pub async fn abort(&mut self) -> Result { + pub async fn abort(&mut self) -> Result { // TODO(aws-sdk-rust#1159) - handle already completed upload // cancel in-progress uploads @@ -78,7 +77,7 @@ impl UploadHandle { } } -async fn abort_upload(handle: &UploadHandle) -> Result { +async fn abort_upload(handle: &UploadHandle) -> Result { let abort_mpu_resp = handle .ctx .client() @@ -99,7 +98,7 @@ async fn abort_upload(handle: &UploadHandle) -> Result Result { +async fn complete_upload(mut handle: UploadHandle) -> Result { if !handle.ctx.is_multipart_upload() { todo!("non mpu upload not implemented yet") } diff --git a/aws-s3-transfer-manager/src/types.rs b/aws-s3-transfer-manager/src/types.rs index 497337b..826c818 100644 --- a/aws-s3-transfer-manager/src/types.rs +++ b/aws-s3-transfer-manager/src/types.rs @@ -6,8 +6,6 @@ use core::fmt; use std::sync::Arc; -use crate::error::DownloadError; - /// The target part size for an upload or download request. #[derive(Debug, Clone, Default)] pub enum PartSize { @@ -116,7 +114,7 @@ pub struct FailedDownloadTransfer { pub(crate) input: crate::operation::download::DownloadInput, /// The error encountered downloading the object - pub(crate) error: DownloadError, + pub(crate) error: crate::error::Error, } impl FailedDownloadTransfer { @@ -126,7 +124,7 @@ impl FailedDownloadTransfer { } /// The error encountered downloading the object - pub fn error(&self) -> &DownloadError { + pub fn error(&self) -> &crate::error::Error { &self.error } } From 870a8da4214d2cc6131364e7df4486e66ca2b9d5 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Tue, 6 Aug 2024 12:44:59 -0400 Subject: [PATCH 2/6] more cleanup --- aws-s3-transfer-manager/src/error.rs | 6 ++++++ aws-s3-transfer-manager/src/operation/download.rs | 5 ----- .../src/operation/download/builders.rs | 3 +-- .../src/operation/download_objects/builders.rs | 3 +-- .../src/operation/upload/builders.rs | 3 +-- aws-s3-transfer-manager/src/operation/upload/handle.rs | 10 ++++++++-- aws-s3-transfer-manager/src/operation/upload/input.rs | 1 - aws-s3-transfer-manager/src/operation/upload/output.rs | 1 - 8 files changed, 17 insertions(+), 15 deletions(-) diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index 5ce69c0..6970c09 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -126,6 +126,12 @@ where } } +impl From for Error { + fn from(value: aws_smithy_types::error::operation::BuildError) -> Self { + Self::new(ErrorKind::InputInvalid, value) + } +} + pub(crate) fn invalid_input(err: E) -> Error where E: Into, diff --git a/aws-s3-transfer-manager/src/operation/download.rs b/aws-s3-transfer-manager/src/operation/download.rs index 5837698..56c4673 100644 --- a/aws-s3-transfer-manager/src/operation/download.rs +++ b/aws-s3-transfer-manager/src/operation/download.rs @@ -69,11 +69,6 @@ impl Download { let input = input.clone(); let rem = discovery.remaining.clone(); - // TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once, - // the downside is controlling memory usage as a large download may result in - // quite a few futures created. If more performant could be enabled for - // objects less than some size. - tasks.spawn(distribute_work( rem, input.into(), diff --git a/aws-s3-transfer-manager/src/operation/download/builders.rs b/aws-s3-transfer-manager/src/operation/download/builders.rs index 48be4b9..5f44e16 100644 --- a/aws-s3-transfer-manager/src/operation/download/builders.rs +++ b/aws-s3-transfer-manager/src/operation/download/builders.rs @@ -23,8 +23,7 @@ impl DownloadFluentBuilder { /// Initiate a download transfer for a single object pub async fn send(self) -> Result { - // FIXME - need DownloadError to support this conversion to remove expect() in favor of ? - let input = self.inner.build().expect("valid input"); + let input = self.inner.build()?; crate::operation::download::Download::orchestrate(self.handle, input).await } diff --git a/aws-s3-transfer-manager/src/operation/download_objects/builders.rs b/aws-s3-transfer-manager/src/operation/download_objects/builders.rs index af7b1c3..b80a963 100644 --- a/aws-s3-transfer-manager/src/operation/download_objects/builders.rs +++ b/aws-s3-transfer-manager/src/operation/download_objects/builders.rs @@ -25,8 +25,7 @@ impl DownloadObjectsFluentBuilder { /// Initiate a download transfer for multiple objects pub async fn send(self) -> Result { - // FIXME - need DownloadObjectsError to support this conversion to remove expect() in favor of ? - let input = self.inner.build().expect("valid input"); + let input = self.inner.build()?; crate::operation::download_objects::DownloadObjects::orchestrate(self.handle, input).await } diff --git a/aws-s3-transfer-manager/src/operation/upload/builders.rs b/aws-s3-transfer-manager/src/operation/upload/builders.rs index bde7099..2efbccd 100644 --- a/aws-s3-transfer-manager/src/operation/upload/builders.rs +++ b/aws-s3-transfer-manager/src/operation/upload/builders.rs @@ -26,8 +26,7 @@ impl UploadFluentBuilder { /// Initiate an upload transfer for a single object pub async fn send(self) -> Result { - // FIXME - need UploadError to support this conversion to remove expect() in favor of ? - let input = self.inner.build().expect("valid input"); + let input = self.inner.build()?; crate::operation::upload::Upload::orchestrate(self.handle, input).await } diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 50249d9..193c3bf 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -6,6 +6,7 @@ use crate::operation::upload::context::UploadContext; use crate::operation::upload::{UploadOutput, UploadOutputBuilder}; use crate::types::{AbortedUpload, FailedMultipartUploadPolicy}; +use aws_sdk_s3::error::DisplayErrorContext; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use tokio::task; @@ -103,7 +104,10 @@ async fn complete_upload(mut handle: UploadHandle) -> Result Result { tracing::error!("multipart upload failed, aborting"); // TODO(aws-sdk-rust#1159) - if cancelling causes an error we want to propagate that in the returned error somehow? - handle.abort().await?; + if let Err(err) = handle.abort().await { + tracing::error!("failed to abort upload: {}", DisplayErrorContext(err)) + }; return Err(err); } } diff --git a/aws-s3-transfer-manager/src/operation/upload/input.rs b/aws-s3-transfer-manager/src/operation/upload/input.rs index a413987..cfbb591 100644 --- a/aws-s3-transfer-manager/src/operation/upload/input.rs +++ b/aws-s3-transfer-manager/src/operation/upload/input.rs @@ -1440,7 +1440,6 @@ impl UploadInputBuilder { } /// Consumes the builder and constructs a [`UploadInput`] - // FIXME(aws-sdk-rust#1159): replace BuildError with our own type? pub fn build(self) -> Result { Ok(UploadInput { body: self.body.unwrap_or_default(), diff --git a/aws-s3-transfer-manager/src/operation/upload/output.rs b/aws-s3-transfer-manager/src/operation/upload/output.rs index 5532678..133e345 100644 --- a/aws-s3-transfer-manager/src/operation/upload/output.rs +++ b/aws-s3-transfer-manager/src/operation/upload/output.rs @@ -493,7 +493,6 @@ impl UploadOutputBuilder { } /// Consumes the builder and constructs a [`UploadOutput`] - // FIXME(aws-sdk-rust#1159): replace BuildError with our own type? pub fn build(self) -> Result { Ok(UploadOutput { expiration: self.expiration, From 3b055b1a1b955460b365a375b557f6003afb8c07 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Tue, 6 Aug 2024 13:05:52 -0400 Subject: [PATCH 3/6] cleanup logs --- .../src/operation/upload/handle.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/aws-s3-transfer-manager/src/operation/upload/handle.rs b/aws-s3-transfer-manager/src/operation/upload/handle.rs index 193c3bf..4cd603f 100644 --- a/aws-s3-transfer-manager/src/operation/upload/handle.rs +++ b/aws-s3-transfer-manager/src/operation/upload/handle.rs @@ -104,11 +104,9 @@ async fn complete_upload(mut handle: UploadHandle) -> Result Result Result Date: Wed, 7 Aug 2024 09:20:07 -0400 Subject: [PATCH 4/6] feedback --- aws-s3-transfer-manager/src/error.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index 6970c09..e0d33d4 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -73,8 +73,8 @@ impl Error { } /// Returns the corresponding [`ErrorKind`] for this error. - pub fn kind(&self) -> ErrorKind { - self.kind.clone() + pub fn kind(&self) -> &ErrorKind { + &self.kind } } @@ -161,9 +161,7 @@ where fn from(value: aws_sdk_s3::error::SdkError) -> Self { // TODO - extract request id/metadata let kind = match value.code() { - Some("NotFound") | Some("NoSuchKey") | Some("NoSuchUpload") | Some("NoSuchBucket") => { - ErrorKind::NotFound - } + Some("NotFound" | "NoSuchKey" | "NoSuchUpload" | "NoSuchBucket") => ErrorKind::NotFound, // TODO - is this the rigth error kind? do we need something else? _ => ErrorKind::ChildOperationFailed, }; From 025e1ed2d90e9c3202600d7708ce944dc8dae487 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 7 Aug 2024 09:34:38 -0400 Subject: [PATCH 5/6] fix cargo-deny config and disable semver check for now --- .cargo-deny-config.toml | 5 +---- .github/workflows/ci.yml | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/.cargo-deny-config.toml b/.cargo-deny-config.toml index 3a9407f..533e59c 100644 --- a/.cargo-deny-config.toml +++ b/.cargo-deny-config.toml @@ -4,10 +4,7 @@ # More documentation for the licenses section can be found here: # https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html [licenses] -default = "deny" -unlicensed = "deny" -copyleft = "deny" -allow-osi-fsf-free = "neither" +version = 2 allow = [ # See https://spdx.org/licenses/ for list of possible licenses # [possible values: any SPDX 3.11 short identifier (+ optional exception)]. diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index accfd58..6cd9b3b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -220,17 +220,18 @@ jobs: # Ignore `trybuild` errors as they are irrelevant and flaky on nightly TRYBUILD: overwrite - semver: - name: semver - needs: basics - runs-on: ubuntu-24.04 - steps: - - uses: actions/checkout@v4 - - name: Check semver - uses: obi1kenobi/cargo-semver-checks-action@v2 - with: - rust-toolchain: ${{ env.rust_stable }} - release-type: minor + # TODO - re-enable semver check once released + # semver: + # name: semver + # needs: basics + # runs-on: ubuntu-24.04 + # steps: + # - uses: actions/checkout@v4 + # - name: Check semver + # uses: obi1kenobi/cargo-semver-checks-action@v2 + # with: + # rust-toolchain: ${{ env.rust_stable }} + # release-type: minor features: name: features From 86308b9cb1f5ee4603d05b55e2b945342a7e1cd1 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 7 Aug 2024 13:59:39 -0400 Subject: [PATCH 6/6] remove other variant --- aws-s3-transfer-manager/src/error.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/aws-s3-transfer-manager/src/error.rs b/aws-s3-transfer-manager/src/error.rs index e0d33d4..b5bf421 100644 --- a/aws-s3-transfer-manager/src/error.rs +++ b/aws-s3-transfer-manager/src/error.rs @@ -44,9 +44,6 @@ pub enum ErrorKind { /// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket) ChildOperationFailed, - - /// A custom error that does not fall under any other error kind - Other, } impl Error { @@ -62,16 +59,6 @@ impl Error { } } - /// Creates a new transfer [`Error`] from an arbitrary payload. - /// - /// This function is a shortcut for [`Error::new`] with [`ErrorKind::Other`] - pub fn other(err: E) -> Error - where - E: Into, - { - Error::new(ErrorKind::Other, err) - } - /// Returns the corresponding [`ErrorKind`] for this error. pub fn kind(&self) -> &ErrorKind { &self.kind @@ -88,7 +75,6 @@ impl fmt::Display for Error { ErrorKind::ChunkFailed => write!(f, "failed to process chunk"), ErrorKind::NotFound => write!(f, "resource not found"), ErrorKind::ChildOperationFailed => write!(f, "child operation failed"), - ErrorKind::Other => write!(f, "unknown error"), } } }