Skip to content

Commit

Permalink
globalize concurrency management (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd authored Oct 1, 2024
1 parent 6c190ca commit 026a55e
Show file tree
Hide file tree
Showing 16 changed files with 518 additions and 20 deletions.
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ async-channel = "2.3.1"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-async = "1.2.1"
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
bytes = "1"
futures-util = "0.3.30"
# FIXME - upgrade to hyper 1.x
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
Expand Down
9 changes: 8 additions & 1 deletion aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::Scheduler;
use crate::Config;
use crate::{
metrics::unit::ByteUnit,
Expand All @@ -21,6 +22,7 @@ pub struct Client {
#[derive(Debug)]
pub(crate) struct Handle {
pub(crate) config: crate::Config,
pub(crate) scheduler: Scheduler,
}

impl Handle {
Expand Down Expand Up @@ -62,8 +64,13 @@ impl Handle {
impl Client {
/// Creates a new client from a transfer manager config.
pub fn new(config: Config) -> Client {
let handle = Arc::new(Handle { config });
let permits = match config.concurrency() {
ConcurrencySetting::Auto => DEFAULT_CONCURRENCY,
ConcurrencySetting::Explicit(explicit) => *explicit,
};
let scheduler = Scheduler::new(permits);

let handle = Arc::new(Handle { config, scheduler });
Client { handle }
}

Expand Down
11 changes: 3 additions & 8 deletions aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ impl Config {
&self.target_part_size
}

// TODO(design) - should we separate upload/download part size and concurrency settings?
//
// FIXME - this setting is wrong, we don't use it right. This should feed into scheduling and
// whether an individual operation can execute an SDK/HTTP request. We should be free to spin
// however many tasks we want per transfer operation OR have separate config for task
// concurrency.
/// Returns the concurrency setting to use for individual transfer operations.
/// Returns the concurrency setting to use for transfer operations.
/// This is the maximum number of in-flight requests allowed across _all_ operations.
pub fn concurrency(&self) -> &ConcurrencySetting {
&self.concurrency
}
Expand Down Expand Up @@ -120,7 +115,7 @@ impl Builder {

/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// This sets the maximum number of concurrent in-flight requests across _all_ operations.
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.concurrency = concurrency;
Expand Down
1 change: 0 additions & 1 deletion aws-s3-transfer-manager/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl FromStr for Range {
match (iter.next(), iter.next()) {
(Some("bytes"), Some(range)) => {
if range.contains(',') {
// TODO(aws-sdk-rust#1159) - error S3 doesn't support multiple byte ranges
Err(error::invalid_input(format!(
"multiple byte ranges not supported for range header {}",
s
Expand Down
6 changes: 5 additions & 1 deletion aws-s3-transfer-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
//! * [`download_objects`](crate::Client::download_objects) - download an entire bucket or prefix to a local directory
//! * [`upload_objects`](crate::Client::upload_objects) - upload an entire local directory to a bucket
pub(crate) const DEFAULT_CONCURRENCY: usize = 8;
/// Default in-flight concurrency
pub(crate) const DEFAULT_CONCURRENCY: usize = 128;

/// Error types emitted by `aws-s3-transfer-manager`
pub mod error;
Expand All @@ -88,6 +89,9 @@ pub(crate) mod middleware;
/// HTTP related components and utils
pub(crate) mod http;

/// Internal runtime componenents
pub(crate) mod runtime;

/// Metrics
pub mod metrics;

Expand Down
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod limit;
pub(crate) mod retry;
6 changes: 6 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod concurrency;
15 changes: 15 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Limit the maximum number of requests being concurrently processed by a service.
//!
//! This middleware is similar to the `tower::limit::concurrency` middleware but goes through
//! the transfer manager scheduler to control concurrency rather than just a semaphore.
mod future;
mod layer;
mod service;

pub(crate) use self::layer::ConcurrencyLimitLayer;
37 changes: 37 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit/concurrency/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::OwnedWorkPermit;

use futures_util::ready;
use pin_project_lite::pin_project;
use std::{future::Future, task::Poll};

pin_project! {
#[derive(Debug)]
pub(crate) struct ResponseFuture<T> {
#[pin]
inner: T,
// retain until dropped when future completes
_permit: OwnedWorkPermit
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(inner: T, _permit: OwnedWorkPermit) -> ResponseFuture<T> {
ResponseFuture { inner, _permit }
}
}

impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.project().inner.poll(cx)))
}
}
31 changes: 31 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit/concurrency/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::Scheduler;
use tower::Layer;

use super::service::ConcurrencyLimit;

/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
#[derive(Debug, Clone)]
pub(crate) struct ConcurrencyLimitLayer {
scheduler: Scheduler,
}

impl ConcurrencyLimitLayer {
/// Create a new concurrency limit layer.
pub(crate) const fn new(scheduler: Scheduler) -> Self {
ConcurrencyLimitLayer { scheduler }
}
}

impl<S> Layer<S> for ConcurrencyLimitLayer {
type Service = ConcurrencyLimit<S>;

fn layer(&self, service: S) -> Self::Service {
ConcurrencyLimit::new(service, self.scheduler.clone())
}
}
Loading

0 comments on commit 026a55e

Please sign in to comment.