Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework errors #42

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ aws-types = "1.3.3"
bytes = "1"
# FIXME - upgrade to hyper 1.x
hyper = { version = "0.14.29", features = ["client"] }
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tracing = "0.1"

Expand Down
4 changes: 2 additions & 2 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ impl Client {
/// # Examples
/// ```no_run
/// use std::path::PathBuf;
/// use aws_s3_transfer_manager::operation::download_objects::DownloadObjectsError;
/// use aws_s3_transfer_manager::error::Error;
///
/// async fn download_bucket(
/// client: &aws_s3_transfer_manager::Client,
/// dest: PathBuf
/// ) -> Result<(), DownloadObjectsError> {
/// ) -> Result<(), Error> {
///
/// let handle = client
/// .download_objects()
Expand Down
250 changes: 147 additions & 103 deletions aws-s3-transfer-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,127 +3,171 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_types::byte_stream;
use std::io;
use std::fmt;

// TODO(design): revisit errors
/// A boxed error that is `Send` and `Sync`.
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Failed transfer result
#[derive(thiserror::Error, Debug)]
pub enum TransferError {
/// The request was invalid
#[error("invalid meta request: {0}")]
InvalidMetaRequest(String),
use aws_sdk_s3::error::ProvideErrorMetadata;

/// A download failed
#[error("download failed")]
DownloadFailed(#[from] DownloadError),
/// Errors returned by this library
///
/// NOTE: Use [`aws_smithy_types::error::display::DisplayErrorContext`] or similar to display
/// the entire error cause/source chain.
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish error reporters were farther along so we could be specific here. Is noting "or similar" helpful? Will people understand what that means?

#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
source: BoxError,
}

/// General categories of transfer errors.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ErrorKind {
/// Operation input validation issues
InputInvalid,

/// I/O errors
IOError,

/// Some kind of internal runtime issue (e.g. task failure, poisoned mutex, etc)
RuntimeError,

/// Object discovery failed
ObjectNotDiscoverable,

/// Failed to upload or download a chunk of an object
ChunkFailed,

/// Resource not found (e.g. bucket, key, multipart upload ID not found)
NotFound,

/// child operation failed (e.g. download of a single object as part of downloading all objects from a bucket)
ChildOperationFailed,
Comment on lines +23 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the way that these are defined, we can't add context to them later. That may be fine. Otherwise, we should define them like so:

/// General categories of transfer errors.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ErrorKind {
    /// Operation input validation issues
    InputInvalid(InputInvalid),
}

pub struct InputInvalid {} 


/// A upload failed
#[error("upload failed")]
UploadFailed(#[from] UploadError),
/// A custom error that does not fall under any other error kind
Other,
aajtodd marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) type GetObjectSdkError = ::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::get_object::GetObjectError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;
pub(crate) type HeadObjectSdkError = ::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::head_object::HeadObjectError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;

/// An error related to downloading an object
#[derive(thiserror::Error, Debug)]
pub enum DownloadError {
/// Discovery of object metadata failed
#[error(transparent)]
DiscoverFailed(SdkOperationError),

/// A failure occurred fetching a single chunk of the overall object data
#[error("download chunk failed")]
ChunkFailed {
/// The underlying SDK error
source: SdkOperationError,
},
impl Error {
/// Creates a new transfer [`Error`] from a known kind of error as well as an arbitrary error
/// source.
pub fn new<E>(kind: ErrorKind, err: E) -> Error
where
E: Into<BoxError>,
{
Error {
kind,
source: err.into(),
}
}

/// Creates a new transfer [`Error`] from an arbitrary payload.
///
/// This function is a shortcut for [`Error::new`] with [`ErrorKind::Other`]
pub fn other<E>(err: E) -> Error
where
E: Into<BoxError>,
{
Error::new(ErrorKind::Other, err)
}

/// Returns the corresponding [`ErrorKind`] for this error.
pub fn kind(&self) -> ErrorKind {
self.kind.clone()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In smithy-rs, we have two patterns for the signature:

    pub fn kind(&self) -> ErrorKind
    pub fn kind(&self) -> &ErrorKind

I'd prefer the latter since it leaves the option to callers whether they want to clone ErrorKind or not, whereas the former always pays the cost of cloning the kind even when callers just need reference at the end of the day (cloning the kind could become more expensive depending on a new variant in the future). I'll leave it up to you, though.

}

pub(crate) type CreateMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;

pub(crate) type UploadPartSdkError = ::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::upload_part::UploadPartError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;

pub(crate) type CompleteMultipartUploadSdkError =
::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;

pub(crate) type AbortMultipartUploadSdkError = ::aws_smithy_runtime_api::client::result::SdkError<
aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError,
::aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;

/// An error related to upload an object
#[derive(thiserror::Error, Debug)]
pub enum UploadError {
/// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload]
#[error(transparent)]
CreateMultipartUpload(#[from] CreateMultipartUploadSdkError),

/// An error occurred invoking [aws_sdk_s3::Client::CreateMultipartUpload]
#[error(transparent)]
CompleteMultipartUpload(#[from] CompleteMultipartUploadSdkError),

/// An error occurred invoking [aws_sdk_s3::Client::UploadPart]
#[error(transparent)]
UploadPart(#[from] UploadPartSdkError),

/// An error occurred invoking [aws_sdk_s3::Client::AbortMultipartUpload]
#[error(transparent)]
AbortMultipartUpload(#[from] AbortMultipartUploadSdkError),

/// An I/O error occurred
#[error(transparent)]
IOError(#[from] crate::io::error::Error),
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
ErrorKind::InputInvalid => write!(f, "invalid input"),
ErrorKind::IOError => write!(f, "I/O error"),
ErrorKind::RuntimeError => write!(f, "runtime error"),
ErrorKind::ObjectNotDiscoverable => write!(f, "object discovery failed"),
ErrorKind::ChunkFailed => write!(f, "failed to process chunk"),
ErrorKind::NotFound => write!(f, "resource not found"),
ErrorKind::ChildOperationFailed => write!(f, "child operation failed"),
ErrorKind::Other => write!(f, "unknown error"),
}
}
}

/// An underlying S3 SDK error
#[derive(thiserror::Error, Debug)]
pub enum SdkOperationError {
/// An error occurred invoking [aws_sdk_s3::Client::head_object]
#[error(transparent)]
HeadObject(#[from] HeadObjectSdkError),
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.source.as_ref())
}
}

impl From<crate::io::error::Error> for Error {
fn from(value: crate::io::error::Error) -> Self {
Self::new(ErrorKind::IOError, value)
}
}

impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Self::new(ErrorKind::IOError, value)
}
}

/// An error occurred invoking [aws_sdk_s3::Client::get_object]
#[error(transparent)]
GetObject(#[from] GetObjectSdkError),
impl From<tokio::task::JoinError> for Error {
fn from(value: tokio::task::JoinError) -> Self {
Self::new(ErrorKind::RuntimeError, value)
}
}

/// An error occurred reading the underlying data
#[error(transparent)]
ReadError(#[from] byte_stream::error::Error),
impl<T> From<std::sync::PoisonError<T>> for Error
where
T: Send + Sync + 'static,
{
fn from(value: std::sync::PoisonError<T>) -> Self {
Self::new(ErrorKind::RuntimeError, value)
}
}

impl From<aws_smithy_types::error::operation::BuildError> for Error {
fn from(value: aws_smithy_types::error::operation::BuildError) -> Self {
Self::new(ErrorKind::InputInvalid, value)
}
}

/// An unknown IO error occurred carrying out the request
#[error(transparent)]
IoError(#[from] io::Error),
pub(crate) fn invalid_input<E>(err: E) -> Error
where
E: Into<BoxError>,
{
Error::new(ErrorKind::InputInvalid, err)
}

// convenience to construct a TransferError from a chunk failure
pub(crate) fn chunk_failed<E: Into<SdkOperationError>>(e: E) -> TransferError {
DownloadError::ChunkFailed { source: e.into() }.into()
pub(crate) fn discovery_failed<E>(err: E) -> Error
where
E: Into<BoxError>,
{
Error::new(ErrorKind::ObjectNotDiscoverable, err)
}

pub(crate) fn invalid_meta_request(message: String) -> TransferError {
TransferError::InvalidMetaRequest(message)
pub(crate) fn from_kind<E>(kind: ErrorKind) -> impl FnOnce(E) -> Error
where
E: Into<BoxError>,
{
|err| Error::new(kind, err)
}

impl From<CreateMultipartUploadSdkError> for TransferError {
fn from(value: CreateMultipartUploadSdkError) -> Self {
TransferError::UploadFailed(value.into())
impl<E, R> From<aws_sdk_s3::error::SdkError<E, R>> for Error
where
E: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static,
R: Send + Sync + fmt::Debug + 'static,
{
fn from(value: aws_sdk_s3::error::SdkError<E, R>) -> Self {
// TODO - extract request id/metadata
let kind = match value.code() {
Some("NotFound") | Some("NoSuchKey") | Some("NoSuchUpload") | Some("NoSuchBucket") => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Some("NotFound") | Some("NoSuchKey") | Some("NoSuchUpload") | Some("NoSuchBucket") => {
Some("NotFound" | "NoSuchKey" | "NoSuchUpload" | "NoSuchBucket") => {

I think this works with MSRV >= 1.76

ErrorKind::NotFound
}
// TODO - is this the rigth error kind? do we need something else?
_ => ErrorKind::ChildOperationFailed,
};

Error::new(kind, value)
}
}
10 changes: 2 additions & 8 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ mod header;
mod object_meta;
mod worker;

use crate::error::TransferError;
use body::Body;
use context::DownloadContext;
use discovery::{discover_obj, ObjectDiscovery};
Expand All @@ -42,7 +41,7 @@ impl Download {
pub(crate) async fn orchestrate(
handle: Arc<crate::client::Handle>,
input: crate::operation::download::DownloadInput,
) -> Result<DownloadHandle, TransferError> {
) -> Result<DownloadHandle, crate::error::Error> {
// if there is a part number then just send the default request
if input.part_number().is_some() {
todo!("single part download not implemented")
Expand Down Expand Up @@ -70,11 +69,6 @@ impl Download {
let input = input.clone();
let rem = discovery.remaining.clone();

// TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once,
// the downside is controlling memory usage as a large download may result in
// quite a few futures created. If more performant could be enabled for
// objects less than some size.

tasks.spawn(distribute_work(
rem,
input.into(),
Expand Down Expand Up @@ -109,7 +103,7 @@ impl Download {
/// the starting sequence number to use for remaining chunks.
async fn handle_discovery_chunk(
discovery: &mut ObjectDiscovery,
completed: &mpsc::Sender<Result<ChunkResponse, TransferError>>,
completed: &mpsc::Sender<Result<ChunkResponse, crate::error::Error>>,
) -> u64 {
let mut start_seq = 0;
if let Some(initial_data) = discovery.initial_chunk.take() {
Expand Down
14 changes: 6 additions & 8 deletions aws-s3-transfer-manager/src/operation/download/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::error::TransferError;
use crate::operation::download::worker::ChunkResponse;
use aws_smithy_types::byte_stream::AggregatedBytes;
use std::cmp;
Expand All @@ -20,7 +19,7 @@ pub struct Body {
sequencer: Sequencer,
}

type BodyChannel = mpsc::Receiver<Result<ChunkResponse, TransferError>>;
type BodyChannel = mpsc::Receiver<Result<ChunkResponse, crate::error::Error>>;

impl Body {
/// Create a new empty Body
Expand Down Expand Up @@ -51,7 +50,7 @@ impl Body {
/// Returns [None] when there is no more data.
/// Chunks returned from a [Body] are guaranteed to be sequenced
/// in the right order.
pub async fn next(&mut self) -> Option<Result<AggregatedBytes, TransferError>> {
pub async fn next(&mut self) -> Option<Result<AggregatedBytes, crate::error::Error>> {
// TODO(aws-sdk-rust#1159, design) - do we want ChunkResponse (or similar) rather than AggregatedBytes? Would
// make additional retries of an individual chunk/part more feasible (though theoretically already exhausted retries)
loop {
Expand Down Expand Up @@ -146,7 +145,7 @@ impl PartialEq for SequencedChunk {
/// A body that returns chunks in whatever order they are received.
#[derive(Debug)]
pub(crate) struct UnorderedBody {
chunks: Option<mpsc::Receiver<Result<ChunkResponse, TransferError>>>,
chunks: Option<mpsc::Receiver<Result<ChunkResponse, crate::error::Error>>>,
}

impl UnorderedBody {
Expand All @@ -160,7 +159,7 @@ impl UnorderedBody {
/// Chunks returned from an [UnorderedBody] are not guaranteed to be sorted
/// in the right order. Consumers are expected to sort the data themselves
/// using the chunk sequence number (starting from zero).
pub(crate) async fn next(&mut self) -> Option<Result<ChunkResponse, TransferError>> {
pub(crate) async fn next(&mut self) -> Option<Result<ChunkResponse, crate::error::Error>> {
match self.chunks.as_mut() {
None => None,
Some(ch) => ch.recv().await,
Expand All @@ -170,8 +169,7 @@ impl UnorderedBody {

#[cfg(test)]
mod tests {
use crate::error::TransferError;
use crate::operation::download::worker::ChunkResponse;
use crate::{error, operation::download::worker::ChunkResponse};
use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream};
use bytes::Bytes;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -226,7 +224,7 @@ mod tests {
let aggregated = ByteStream::from(data).collect().await.unwrap();
let chunk = chunk_resp(0, Some(aggregated));
tx.send(Ok(chunk)).await.unwrap();
let err = TransferError::InvalidMetaRequest("test errors".to_string());
let err = error::Error::new(error::ErrorKind::InputInvalid, "test errors".to_string());
tx.send(Err(err)).await.unwrap();
});

Expand Down
Loading
Loading