Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

globalize concurrency management #55

Merged
merged 15 commits into from
Oct 1, 2024
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ async-channel = "2.3.1"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-async = "1.2.1"
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
bytes = "1"
futures-util = "0.3.30"
# FIXME - upgrade to hyper 1.x
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
Expand Down
9 changes: 8 additions & 1 deletion aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::Scheduler;
use crate::Config;
use crate::{
metrics::unit::ByteUnit,
Expand All @@ -21,6 +22,7 @@ pub struct Client {
#[derive(Debug)]
pub(crate) struct Handle {
pub(crate) config: crate::Config,
pub(crate) scheduler: Scheduler,
}

impl Handle {
Expand Down Expand Up @@ -62,8 +64,13 @@ impl Handle {
impl Client {
/// Creates a new client from a transfer manager config.
pub fn new(config: Config) -> Client {
let handle = Arc::new(Handle { config });
let permits = match config.concurrency() {
ConcurrencySetting::Auto => DEFAULT_CONCURRENCY,
ConcurrencySetting::Explicit(explicit) => *explicit,
};
let scheduler = Scheduler::new(permits);

let handle = Arc::new(Handle { config, scheduler });
Client { handle }
}

Expand Down
11 changes: 3 additions & 8 deletions aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ impl Config {
&self.target_part_size
}

// TODO(design) - should we separate upload/download part size and concurrency settings?
//
// FIXME - this setting is wrong, we don't use it right. This should feed into scheduling and
// whether an individual operation can execute an SDK/HTTP request. We should be free to spin
// however many tasks we want per transfer operation OR have separate config for task
// concurrency.
/// Returns the concurrency setting to use for individual transfer operations.
/// Returns the concurrency setting to use for transfer operations.
/// This is the maximum number of in-flight requests allowed across _all_ operations.
pub fn concurrency(&self) -> &ConcurrencySetting {
&self.concurrency
}
Expand Down Expand Up @@ -120,7 +115,7 @@ impl Builder {

/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// This sets the maximum number of concurrent in-flight requests across _all_ operations.
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.concurrency = concurrency;
Expand Down
1 change: 0 additions & 1 deletion aws-s3-transfer-manager/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl FromStr for Range {
match (iter.next(), iter.next()) {
(Some("bytes"), Some(range)) => {
if range.contains(',') {
// TODO(aws-sdk-rust#1159) - error S3 doesn't support multiple byte ranges
Err(error::invalid_input(format!(
"multiple byte ranges not supported for range header {}",
s
Expand Down
6 changes: 5 additions & 1 deletion aws-s3-transfer-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
//! * [`download_objects`](crate::Client::download_objects) - download an entire bucket or prefix to a local directory
//! * [`upload_objects`](crate::Client::upload_objects) - upload an entire local directory to a bucket

pub(crate) const DEFAULT_CONCURRENCY: usize = 8;
/// Default in-flight concurrency
pub(crate) const DEFAULT_CONCURRENCY: usize = 128;

/// Error types emitted by `aws-s3-transfer-manager`
pub mod error;
Expand All @@ -88,6 +89,9 @@ pub(crate) mod middleware;
/// HTTP related components and utils
pub(crate) mod http;

/// Internal runtime componenents
pub(crate) mod runtime;

/// Metrics
pub mod metrics;

Expand Down
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod limit;
pub(crate) mod retry;
6 changes: 6 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod concurrency;
15 changes: 15 additions & 0 deletions aws-s3-transfer-manager/src/middleware/limit/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Limit the maximum number of requests being concurrently processed by a service.
//!
//! This middleware is similar to the `tower::limit::concurrency` middleware but goes through
//! the transfer manager scheduler to control concurrency rather than just a semaphore.

mod future;
mod layer;
mod service;

pub(crate) use self::layer::ConcurrencyLimitLayer;
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::OwnedWorkPermit;

use futures_util::ready;
use pin_project_lite::pin_project;
use std::{future::Future, task::Poll};

pin_project! {
#[derive(Debug)]
pub(crate) struct ResponseFuture<T> {
#[pin]
inner: T,
// retain until dropped when future completes
_permit: OwnedWorkPermit
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(inner: T, _permit: OwnedWorkPermit) -> ResponseFuture<T> {
ResponseFuture { inner, _permit }
}
}

impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.project().inner.poll(cx)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::runtime::scheduler::Scheduler;
use tower::Layer;

use super::service::ConcurrencyLimit;

/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
#[derive(Debug, Clone)]
pub(crate) struct ConcurrencyLimitLayer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall code structure is confusing. We have middleware/limit/ and middleware/limit/concurrency, but it contains the ConcurrencyLimit service. I am a bit confused about why we don't have something simpler like middleware/concurrency_limit.rs, middleware/concurrency_limit/* and get rid of limit?

Copy link
Contributor Author

@aajtodd aajtodd Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I sort of just followed tokio's existing module structure (they have both a concurrency limit and a rate limit middleware and thus both are under the limit module). I don't have a strong opinion on this one.

scheduler: Scheduler,
}

impl ConcurrencyLimitLayer {
/// Create a new concurrency limit layer.
pub(crate) const fn new(scheduler: Scheduler) -> Self {
ConcurrencyLimitLayer { scheduler }
}
}

impl<S> Layer<S> for ConcurrencyLimitLayer {
type Service = ConcurrencyLimit<S>;

fn layer(&self, service: S) -> Self::Service {
ConcurrencyLimit::new(service, self.scheduler.clone())
}
}
Loading