Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

globalize concurrency management #55

Merged
merged 15 commits into from
Oct 1, 2024
Merged

globalize concurrency management #55

merged 15 commits into from
Oct 1, 2024

Conversation

aajtodd
Copy link
Contributor

@aajtodd aajtodd commented Sep 30, 2024

Issue #, if available:

#31

Description of changes:

"Globalize" the concurrency management of the transfer manager such that all uploads and downloads respect the configured concurrency.

  • Introduce a Scheduler that is part of the client handle that is threaded throughout each operation already.
  • Replace the tower::limit::ConcurrencyLimit middleware with an equivalent based on our new scheduler. The tower middleware was based on a simple semaphore. For now we've just moved the semaphore into the scheduler. In time I expect we'll drop the semaphore for something that allows priority based scheduling, perhaps coupled with acquire(...) taking a description/metadata of the work to acquire a permit for.
  • fix: make object discovery coordinate work with the scheduler
  • I also upped the default concurrency. I think 8 is going to be way too low. I chose 128 based on roughly reaching 10 Gbps throughput. Assuming 90 MB/request throughput for S3 that gives us around 11.5 Gbps. We could make this more exact of course but a power of 2 felt right.
    • Question: Uploads and downloads are both going through this scheduler. Do we (eventually) want to split the concurrency such that uploads and downloads are independently scheduled/limited?

I did a quick check on c5n.18xlarge download test to verify there are no regressions. If there are it's in the noise.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@aajtodd aajtodd requested a review from a team as a code owner September 30, 2024 13:47
/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
#[derive(Debug, Clone)]
pub(crate) struct ConcurrencyLimitLayer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall code structure is confusing. We have middleware/limit/ and middleware/limit/concurrency, but it contains the ConcurrencyLimit service. I am a bit confused about why we don't have something simpler like middleware/concurrency_limit.rs, middleware/concurrency_limit/* and get rid of limit?

Copy link
Contributor Author

@aajtodd aajtodd Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I sort of just followed tokio's existing module structure (they have both a concurrency limit and a rate limit middleware and thus both are under the limit module). I don't have a strong opinion on this one.

Comment on lines 35 to 41
#[derive(Debug)]
enum State {
/// Permit acquired and ready
Ready(OwnedWorkPermit),
/// Waiting on a permit (or haven't attempted to acquire one)
Pending(Option<AcquirePermitFuture>),
}
Copy link
Contributor

@waahm7 waahm7 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Debug)]
enum State {
/// Permit acquired and ready
Ready(OwnedWorkPermit),
/// Waiting on a permit (or haven't attempted to acquire one)
Pending(Option<AcquirePermitFuture>),
}
#[derive(Debug)]
enum State {
/// Permit acquired and ready
Ready(OwnedWorkPermit),
/// Waiting on a permit
PendingAcquire(AcquirePermitFuture),
/// Haven't attempted to acquire a permit yet
PendingStart,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this third state improves the state machine much (to me anyway). When we extract the permit it has to go back to some kind of pending. Setting it to PendingStart seems odd to me though as we will have acquired/attempted to acquire before. Maybe it's just a naming thing 🤷‍♂️ .

Curious what others think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting thought. I suppose thoughPendingStart is implied by Pending(None) in the original code, so maybe it's ok to keep what we have?

Copy link
Contributor

@waahm7 waahm7 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's fine, it just felt odd to me that we are using an Option inside an enum to represent a third state. I think it's more obvious/cleaner to have 3 explicit states instead of 2 states, and one of the states also has two other states.

When we extract the permit it has to go back to some kind of pending. Setting

We can introduce a 4th done state or better naming for PendingStart state. Generally, I prefer an explicit state machine over a concise machine where you have to read the code to understand all the different branches. However, this state machine is small enough that this is probably fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: agree with waqar that 3 explicit states > 2 states (where one states has 2 substates)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaron's original way is how it's done in Tokio, so now I'm fine with this now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blerg already moved it to 3 states, I'll leave it that way for now but yeah this isn't all that uncommon in Rust since pattern matching allows you to separate them anyway

e.g.

match state {
    State::Ready(permit) => ...,
    State::Pending(Some(fut)) => ...,
    State::Pending(None) => ...,
}

Comment on lines +30 to +32
// TODO - add some notion of "work type" and/or "work estimate" to permit acquisition to allow
// for scheduler to make choices on what work gets prioritized
Copy link
Contributor

@waahm7 waahm7 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, we can probably hang forever. If there are 100 parts, and we schedule parts 1-10, and now parts 11-100 are waiting for the semaphore to release. Is it possible to just schedule parts out of order, like 33-43, and then we are stuck waiting for the body to make the space, which will never happen?

Tokio semaphore is fair so it will respect the acquisition order.

@@ -21,7 +21,8 @@
rust_2018_idioms
)]

pub(crate) const DEFAULT_CONCURRENCY: usize = 8;
/// Default in-flight concurrency
pub(crate) const DEFAULT_CONCURRENCY: usize = 96;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming 90 MB/request throughput for S3...

Yeah, 90MiB/s seems like a really good assumption for now 👍.

If we assume 90MiB/s:

  • For a c5n.18xlarge with 100Gb/s: 132 is theoretically ideal.
  • For a c7gn.16xlarge with 200Gb/s: 265 is theoretically ideal.

That lines up with my HTTP benchmarks from a few weeks back, testing on a 200Gb/s machine, performance peaked at concurrency of 275 (I was testing concurrency in increments of 25)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I said in the PR 128 but didn't update it before posting the PR 🤦‍♂️ .

We should talk about this one though.

Comment on lines 35 to 41
#[derive(Debug)]
enum State {
/// Permit acquired and ready
Ready(OwnedWorkPermit),
/// Waiting on a permit (or haven't attempted to acquire one)
Pending(Option<AcquirePermitFuture>),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: agree with waqar that 3 explicit states > 2 states (where one states has 2 substates)

State::Pending(Some(permit_fut)) => {
let permit_fut = pin!(permit_fut);
let permit =
ready!(permit_fut.poll(cx)).expect("permit acquisition never fails");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: we expect this to fail in the future, right? maybe add "currently" or "at this time" to this comment? or TODO or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we ever expect it to fail, the only reason it would fail for a semaphore is if the semaphore is closed which we never do. That's what expect() is for though, if you ever hit it that means your assumptions were/are wrong or you broke them somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why not have it return a Future with OwnedWorkPermit inside, instead of Result<OwnedWorkPermit>

These are just internal types, right? If we ever need it to fail in the future, we change it back to Result, and the compiler shows us where we need to add error-handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but we'd just be moving the expect() since the underlying semaphore we are (currently) using is fallible anyway.

Copy link
Contributor

@ysaito1001 ysaito1001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides responding to feedback by Waqar and Michael, LGTM from my end.

@@ -111,6 +113,7 @@ fn handle_discovery_chunk(
})
.map_err(error::discovery_failed);

drop(permit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is subtle. This task releases permit so others can move on, before sending the chunk to the sequencer 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, that is subtle. Add comment explaining this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have similar logic for NON-discovery chunks? dropping the semaphore before waiting on the send() to the sequencer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it's implicit / by construction. It's dropped when the response future for the middleware drops the permit (which happens before the response from the service is even returned to the caller).

Comment on lines +104 to +101
#[cfg(test)]
mod tests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Comment on lines 35 to 41
#[derive(Debug)]
enum State {
/// Permit acquired and ready
Ready(OwnedWorkPermit),
/// Waiting on a permit (or haven't attempted to acquire one)
Pending(Option<AcquirePermitFuture>),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaron's original way is how it's done in Tokio, so now I'm fine with this now

aws-s3-transfer-manager/src/runtime/scheduler.rs Outdated Show resolved Hide resolved
State::Pending(Some(permit_fut)) => {
let permit_fut = pin!(permit_fut);
let permit =
ready!(permit_fut.poll(cx)).expect("permit acquisition never fails");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why not have it return a Future with OwnedWorkPermit inside, instead of Result<OwnedWorkPermit>

These are just internal types, right? If we ever need it to fail in the future, we change it back to Result, and the compiler shows us where we need to add error-handling.

@@ -111,6 +113,7 @@ fn handle_discovery_chunk(
})
.map_err(error::discovery_failed);

drop(permit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have similar logic for NON-discovery chunks? dropping the semaphore before waiting on the send() to the sequencer?

Copy link
Contributor

@graebm graebm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Feel free to submit once feedback ironed out

@aajtodd aajtodd merged commit 026a55e into main Oct 1, 2024
15 checks passed
@aajtodd aajtodd deleted the atodd/globalize branch October 1, 2024 18:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants