Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-organize crate structure part one #36

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::{mem, time};

use aws_s3_transfer_manager::download::Downloader;

use aws_s3_transfer_manager::download::body::Body;
use aws_s3_transfer_manager::client::downloader::body::Body;
use aws_s3_transfer_manager::client::Downloader;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_s3_transfer_manager::upload::{UploadRequest, Uploader};
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_types::SdkConfig;
use bytes::Buf;
Expand Down Expand Up @@ -161,28 +159,33 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env().load().await;
warmup(&config).await?;

let tm = Uploader::builder()
.sdk_config(config)
let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();

let tm = aws_s3_transfer_manager::Client::new(tm_config);
Comment on lines +167 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be confusing to call it aws_s3_transfer_manager::Client but also the aws_sdk_s3::Client is in the mix? User code is going to end up with 2 "S3 clients". Should it be aws_s3_transfer_manager::TransferManager instead?

Or is that just normal? Everything is a "::Client", it's namespaced by the module name, and users are used to that, and give good disambiguating variable names when they both appear side-by-side

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave it for now but feel free to add it to the bikeshed issue. I can go either way, I do think it's redundant a bit when the crate/module name has transfer manager in it, though the SEP says it should be named TransferManager so maybe we call it that. Users can give an alias to an import to disambiguate e.g. use aws_s3_transfer_manager::Client as TransferManager. Also my hope is that the need for users to instantiate their own aws_sdk_s3::Client is rare when we add some convenience "loader" functions.


let path = args.source.expect_local();
let file_meta = fs::metadata(path).await.expect("file metadata");

let stream = InputStream::from_path(path)?;
let (bucket, key) = args.dest.expect_s3().parts();

let request = UploadRequest::builder()
println!("starting upload");
let start = time::Instant::now();

let handle = tm
.upload()
.bucket(bucket)
.key(key)
.body(stream)
.build()?;

println!("starting upload");
let start = time::Instant::now();
.send()
.await?;

let handle = tm.upload(request).await?;
let _resp = handle.join().await?;
let elapsed = start.elapsed();

Expand Down
104 changes: 104 additions & 0 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

/// Abstractions for downloading objects from S3
pub mod downloader;
pub use downloader::Downloader;

use crate::Config;
use crate::{
types::{ConcurrencySetting, PartSize},
DEFAULT_CONCURRENCY, MEBIBYTE,
};
use std::sync::Arc;

/// Transfer manager client for Amazon Simple Storage Service.
#[derive(Debug, Clone)]
pub struct Client {
handle: Arc<Handle>,
}

/// Whatever is needed to carry out operations, e.g. scheduler, budgets, config, env details, etc
#[derive(Debug)]
pub(crate) struct Handle {
pub(crate) config: crate::Config,
}

impl Handle {
/// Get the concrete number of workers to use based on the concurrency setting.
pub(crate) fn num_workers(&self) -> usize {
match self.config.concurrency() {
// TODO(aws-sdk-rust#1159): add logic for determining this
ConcurrencySetting::Auto => DEFAULT_CONCURRENCY,
ConcurrencySetting::Explicit(explicit) => *explicit,
}
}

/// Get the concrete minimum upload size in bytes to use to determine whether multipart uploads
/// are enabled for a given request.
pub(crate) fn mpu_threshold_bytes(&self) -> u64 {
match self.config.multipart_threshold() {
PartSize::Auto => 16 * MEBIBYTE,
PartSize::Target(explicit) => *explicit,
}
}

/// Get the concrete target part size to use for uploads
pub(crate) fn upload_part_size_bytes(&self) -> u64 {
match self.config.part_size() {
PartSize::Auto => 8 * MEBIBYTE,
PartSize::Target(explicit) => *explicit,
}
}
}

impl Client {
/// Creates a new client from a transfer manager config.
pub fn new(config: Config) -> Client {
let handle = Arc::new(Handle { config });

Client { handle }
}

/// Returns the client's configuration
pub fn config(&self) -> &Config {
&self.handle.config
}

/// Constructs a fluent builder for the
/// [`Upload`](crate::operation::upload::builders::UploadFluentBuilder) operation.
///
/// # Examples
///
/// ```no_run
/// use std::error::Error;
/// use std::path::Path;
/// use aws_s3_transfer_manager::io::InputStream;
///
/// async fn upload_file(
/// client: &aws_s3_transfer_manager::Client,
/// path: impl AsRef<Path>
/// ) -> Result<(), Box<dyn Error>> {
/// let stream = InputStream::from_path(path)?;
/// let handle = client.upload()
/// .bucket("my-bucket")
/// .key("my_key")
/// .body(stream)
/// .send()
/// .await?;
///
/// // send() will return potentially before the transfer is complete.
/// // The handle given must be joined to drive the request to completion.
/// // It can also be used to get progress, pause, or cancel the transfer, etc.
aajtodd marked this conversation as resolved.
Show resolved Hide resolved
/// let response = handle.join().await?;
/// // ... do something with response
/// Ok(())
/// }
///
/// ```
pub fn upload(&self) -> crate::operation::upload::builders::UploadFluentBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say it seems it weird that the returned type has such a long module path, and wondered if we want to shorten it at least for public consumption? But I see that this mirrors the path to aws_sdk_s3's fluent builders, so it's probably good

I'm seeing now how the module organization here really mirrors the SDK's module organization, which seems good to do.

no need to respond. just sharing my mental journey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes no difference to the generated docs or how users consume it FWIW. The end result to a user is the same as:

use crate::operation::upload::builders::UploadFluentBuilder;

...

pub fn upload(&self) -> UploadFluentBuilder

crate::operation::upload::builders::UploadFluentBuilder::new(self.handle.clone())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ mod header;
mod object_meta;
mod worker;

use crate::download::body::Body;
use crate::download::discovery::{discover_obj, ObjectDiscovery};
use crate::download::handle::DownloadHandle;
use crate::download::worker::{distribute_work, download_chunks, ChunkResponse};
use crate::client::downloader::body::Body;
use crate::client::downloader::discovery::{discover_obj, ObjectDiscovery};
use crate::client::downloader::handle::DownloadHandle;
use crate::client::downloader::worker::{distribute_work, download_chunks, ChunkResponse};
use crate::error::TransferError;
use crate::operation::download::DownloadInput;
use crate::types::{ConcurrencySetting, PartSize};
use crate::{DEFAULT_CONCURRENCY, MEBIBYTE};
use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder};
use aws_types::SdkConfig;
use context::DownloadContext;
use tokio::sync::mpsc;
Expand All @@ -28,28 +28,6 @@ use tracing::Instrument;

// TODO(aws-sdk-rust#1159) - need to set User-Agent header value for SEP, e.g. `ft/hll#s3-transfer`

/// Request type for downloading a single object
#[derive(Debug)]
#[non_exhaustive]
pub struct DownloadRequest {
pub(crate) input: GetObjectInputBuilder,
}

// FIXME - should probably be TryFrom since checksums may conflict?
impl From<GetObjectFluentBuilder> for DownloadRequest {
fn from(value: GetObjectFluentBuilder) -> Self {
Self {
input: value.as_input().clone(),
}
}
}

impl From<GetObjectInputBuilder> for DownloadRequest {
fn from(value: GetObjectInputBuilder) -> Self {
Self { input: value }
}
}

/// Fluent style builder for [Downloader]
#[derive(Debug, Clone, Default)]
pub struct Builder {
Expand Down Expand Up @@ -131,7 +109,8 @@ impl Downloader {
/// ```no_run
/// use std::error::Error;
/// use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
/// use aws_s3_transfer_manager::download::{Downloader, DownloadRequest};
/// use aws_s3_transfer_manager::client::Downloader;
/// use aws_s3_transfer_manager::operation::download::DownloadInput;
///
/// async fn get_object(client: Downloader) -> Result<(), Box<dyn Error>> {
/// let request = GetObjectInputBuilder::default()
Expand All @@ -144,7 +123,7 @@ impl Downloader {
/// Ok(())
/// }
/// ```
pub async fn download(&self, req: DownloadRequest) -> Result<DownloadHandle, TransferError> {
pub async fn download(&self, req: DownloadInput) -> Result<DownloadHandle, TransferError> {
// if there is a part number then just send the default request
if req.input.get_part_number().is_some() {
todo!("single part download not implemented")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::download::worker::ChunkResponse;
use crate::client::downloader::worker::ChunkResponse;
use crate::error::TransferError;
use aws_smithy_types::byte_stream::AggregatedBytes;
use std::cmp;
Expand Down Expand Up @@ -170,7 +170,7 @@ impl UnorderedBody {

#[cfg(test)]
mod tests {
use crate::download::worker::ChunkResponse;
use crate::client::downloader::worker::ChunkResponse;
use crate::error::TransferError;
use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream};
use bytes::Bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::error;

use super::header::{self, ByteRange};
use super::object_meta::ObjectMetadata;
use super::DownloadRequest;
use crate::download::context::DownloadContext;
use super::DownloadInput;
use crate::client::downloader::context::DownloadContext;

#[derive(Debug, Clone, PartialEq)]
enum ObjectDiscoveryStrategy {
Expand All @@ -44,7 +44,7 @@ pub(super) struct ObjectDiscovery {

impl ObjectDiscoveryStrategy {
fn from_request(
request: &DownloadRequest,
request: &DownloadInput,
) -> Result<ObjectDiscoveryStrategy, error::TransferError> {
let strategy = match request.input.get_range() {
Some(h) => {
Expand All @@ -71,7 +71,7 @@ impl ObjectDiscoveryStrategy {
/// to be fetched, and _(if available)_ the first chunk of data.
pub(super) async fn discover_obj(
ctx: &DownloadContext,
request: &DownloadRequest,
request: &DownloadInput,
) -> Result<ObjectDiscovery, error::TransferError> {
let strategy = ObjectDiscoveryStrategy::from_request(request)?;
match strategy {
Expand Down Expand Up @@ -99,7 +99,7 @@ pub(super) async fn discover_obj(

async fn discover_obj_with_head(
ctx: &DownloadContext,
request: &DownloadRequest,
request: &DownloadInput,
byte_range: Option<ByteRange>,
) -> Result<ObjectDiscovery, error::TransferError> {
let meta: ObjectMetadata = ctx
Expand Down Expand Up @@ -166,11 +166,11 @@ async fn discover_obj_with_get(

#[cfg(test)]
mod tests {
use crate::download::context::DownloadContext;
use crate::download::discovery::{
use crate::client::downloader::context::DownloadContext;
use crate::client::downloader::discovery::{
discover_obj, discover_obj_with_head, ObjectDiscoveryStrategy,
};
use crate::download::header::ByteRange;
use crate::client::downloader::header::ByteRange;
use crate::MEBIBYTE;
use aws_sdk_s3::operation::get_object::{GetObjectInput, GetObjectOutput};
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::download::body::Body;
use crate::download::object_meta::ObjectMetadata;
use crate::client::downloader::body::Body;
use crate::client::downloader::object_meta::ObjectMetadata;
use tokio::task;

/// Response type for a single download object request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::download::context::DownloadContext;
use crate::download::header;
use crate::client::downloader::context::DownloadContext;
use crate::client::downloader::header;
use crate::error;
use crate::error::TransferError;
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
Expand Down Expand Up @@ -137,8 +137,8 @@ fn next_chunk(

#[cfg(test)]
mod tests {
use crate::download::header;
use crate::download::worker::distribute_work;
use crate::client::downloader::header;
use crate::client::downloader::worker::distribute_work;
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use std::ops::RangeInclusive;

Expand Down
Loading
Loading