-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Hedging (Retry Slow Parts) for Upload #57
Conversation
# Conflicts: # aws-s3-transfer-manager/Cargo.toml # aws-s3-transfer-manager/src/operation/upload.rs # aws-s3-transfer-manager/src/operation/upload/service.rs
# Conflicts: # aws-s3-transfer-manager/Cargo.toml # aws-s3-transfer-manager/src/middleware.rs # aws-s3-transfer-manager/src/operation/upload/service.rs
pub(crate) fn new(policy: P) -> Self { | ||
Self { | ||
policy, | ||
latency_percentile: 95.0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably define these as constants and maybe some comments on how/why they were chosen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have moved it to constants and added docs.
let svc = ServiceBuilder::new() | ||
.layer(concurrency_limit) | ||
// FIXME - This setting will need to be globalized. | ||
.buffer(ctx.handle.num_workers()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Given this doesn't actually place requests on the wire yet is there a reasonable constant that would make sense here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer layer docs mention that
/// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
/// requests."
So I thought that the max number of requests was a good value here.
svc.map_err(|err| { | ||
let e = err | ||
.downcast::<error::Error>() | ||
.unwrap_or_else(|err| Box::new(error::Error::new(error::ErrorKind::RuntimeError, err))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do you need to box this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how to get rid of the box here. downcast
returns the Box<Error>
, and the Box::into_inner
API is unstable. I had to box it to have the same type in both branches.
#[derive(Debug, Clone)] | ||
pub(crate) struct UploadPartPolicy; | ||
|
||
impl Policy<UploadPartRequest> for UploadPartPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: You could probably just make this the default policy since downloads would look the same and move it into the hedge module directly. If we ever need to differentiate we could but then you could just do something like:
let svc = ServiceBuilder::new()
...
.layer(hedge::Builder::default().into_layer())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated it to the default policy.
ServiceBuilder::new().layer(concurrency_limit).service(svc) | ||
|
||
let svc = ServiceBuilder::new() | ||
.layer(concurrency_limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra "hedging" request should count extra towards the concurrency limit. If this is hard to do, let's do it in a followup task.
Even better would be if hedged requests were lower priority, and only got a ticket when there's no other work to do. Since they really only help us if we're done with a workload and all that's left is the slow requests. Otherwise they're just stealing from another request that would be more useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have looked into this, and it seems like the hedged requests can bypass the concurrency_limit layer. Reorganizing the layers had no effect. I have added a TODO to fix this for now since I want to get this merged and see if there is any difference in benchmarks.
Another interesting thing I noticed is that we are hedging very few requests, like for a 30GB upload with 125 concurrency, we only hedged less 5-15 requests. We can revisit the numbers once we have more tracing/metrics in place.
Co-authored-by: Michael Graeb <[email protected]>
Description of changes:
Here are the benchmarks with and without hedging for Upload:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.