-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DEX solvers rate limiter #2071
DEX solvers rate limiter #2071
Changes from 13 commits
9a6db4d
4c9c8e9
cf6e0dc
432d0e9
f688b18
58764c4
840f59f
33d1a1e
c7c9ab9
1810f58
92a04e8
71e0290
4ad7c4e
543bdb8
e9420e9
a9c5365
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,7 +174,7 @@ impl RateLimiter { | |
} | ||
} | ||
|
||
#[derive(Error, Debug, Clone, Default)] | ||
#[derive(Error, Debug, Clone, Default, PartialEq)] | ||
pub enum RateLimiterError { | ||
#[default] | ||
#[error("rate limited")] | ||
|
@@ -221,6 +221,30 @@ impl RateLimiter { | |
|
||
Ok(result) | ||
} | ||
|
||
pub async fn execute_with_back_off<T>( | ||
&self, | ||
task: impl Future<Output = T>, | ||
requires_back_off: impl Fn(&T) -> bool, | ||
) -> Result<T, RateLimiterError> { | ||
if let Some(back_off_duration) = self.get_back_off_duration_if_limited() { | ||
tokio::time::sleep(back_off_duration).await; | ||
} | ||
|
||
self.execute(task, requires_back_off).await | ||
} | ||
|
||
fn get_back_off_duration_if_limited(&self) -> Option<Duration> { | ||
let strategy = self.strategy.lock().unwrap(); | ||
let now = Instant::now(); | ||
|
||
if strategy.drop_requests_until > now { | ||
let back_off_duration = strategy.drop_requests_until - now; | ||
Some(back_off_duration) | ||
} else { | ||
None | ||
} | ||
} | ||
} | ||
|
||
/// Shared module with common back-off checks. | ||
|
@@ -236,7 +260,7 @@ pub mod back_off { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use {super::*, futures::FutureExt, tokio::time::sleep}; | ||
use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep}; | ||
|
||
#[test] | ||
fn current_back_off_does_not_panic() { | ||
|
@@ -317,4 +341,60 @@ mod tests { | |
rate_limiter.strategy().get_current_back_off() | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_execute_with_no_back_off() { | ||
let timeout = Duration::from_secs(30); | ||
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); | ||
let original_drop_until = strategy.drop_requests_until; | ||
let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string()); | ||
|
||
let result = rate_limiter | ||
.execute_with_back_off(async { 1 }, |_| false) | ||
.await | ||
.unwrap(); | ||
|
||
assert_eq!(result, 1); | ||
{ | ||
let current_strategy = rate_limiter.strategy.lock().unwrap(); | ||
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to assert that we didn't have to wait for the allowed backoff time, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replaced |
||
} | ||
|
||
let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap(); | ||
assert_eq!(result, 1); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_execute_with_back_off() { | ||
let timeout = Duration::from_secs(3); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To test timing sensitive stuff it's usually sufficient to use timeouts in the tens of milliseconds to not delay the test suite unnecessarily. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reduced the timeout |
||
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); | ||
let original_drop_until = strategy.drop_requests_until; | ||
let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string()); | ||
|
||
let result = rate_limiter | ||
.execute_with_back_off(async { 1 }, |_| true) | ||
.await | ||
.unwrap(); | ||
|
||
assert_eq!(result, 1); | ||
let drop_until = { | ||
let current_strategy = rate_limiter.strategy.lock().unwrap(); | ||
let drop_until = current_strategy.drop_requests_until; | ||
assert!(drop_until >= original_drop_until.add(timeout)); | ||
drop_until | ||
}; | ||
|
||
let result = rate_limiter.execute(async { 1 }, |_| false).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Would be nice to have some comments what each individual block is testing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
assert_eq!(result, Err(RateLimiterError::RateLimited)); | ||
{ | ||
let current_strategy = rate_limiter.strategy.lock().unwrap(); | ||
assert_eq!(current_strategy.drop_requests_until, drop_until); | ||
} | ||
|
||
let result = rate_limiter | ||
.execute_with_back_off(async { 1 }, |_| false) | ||
.await | ||
.unwrap(); | ||
assert_eq!(result, 1); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use { | ||
anyhow::Result, | ||
shared::rate_limiter::{ | ||
RateLimiter as SharedRateLimiter, | ||
RateLimiterError as SharedRateLimiterError, | ||
RateLimitingStrategy as SharedRateLimitingStrategy, | ||
}, | ||
std::{future::Future, time::Duration}, | ||
thiserror::Error, | ||
}; | ||
|
||
pub struct RateLimiter { | ||
inner: SharedRateLimiter, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I would prefer to have here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That actually There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to export |
||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct RateLimitingStrategy { | ||
inner: SharedRateLimitingStrategy, | ||
} | ||
|
||
impl RateLimitingStrategy { | ||
pub fn try_new( | ||
back_off_growth_factor: f64, | ||
min_back_off: Duration, | ||
max_back_off: Duration, | ||
) -> Result<Self> { | ||
SharedRateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off) | ||
.map(|shared| Self { inner: shared }) | ||
} | ||
} | ||
|
||
#[derive(Error, Debug, Clone, Default)] | ||
pub enum RateLimiterError { | ||
#[default] | ||
#[error("rate limited")] | ||
RateLimited, | ||
} | ||
|
||
impl RateLimiter { | ||
pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { | ||
Self { | ||
inner: SharedRateLimiter::from_strategy(strategy.inner, name), | ||
} | ||
} | ||
|
||
pub async fn execute_with_back_off<T>( | ||
&self, | ||
task: impl Future<Output = T>, | ||
requires_back_off: impl Fn(&T) -> bool, | ||
) -> Result<T, RateLimiterError> { | ||
self.inner | ||
.execute_with_back_off(task, requires_back_off) | ||
.await | ||
.map_err(|err| match err { | ||
SharedRateLimiterError::RateLimited => RateLimiterError::RateLimited, | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,15 @@ | |
|
||
use { | ||
crate::{ | ||
domain, | ||
domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills}, | ||
boundary::rate_limiter::{RateLimiter, RateLimiterError}, | ||
domain::{ | ||
self, | ||
auction, | ||
dex::{self, slippage}, | ||
order::{self, Order}, | ||
solution, | ||
solver::dex::fills::Fills, | ||
}, | ||
infra, | ||
}, | ||
futures::{future, stream, FutureExt, StreamExt}, | ||
|
@@ -33,10 +40,17 @@ pub struct Dex { | |
|
||
/// Parameters used to calculate the revert risk of a solution. | ||
risk: domain::Risk, | ||
|
||
/// Handles 429 Too Many Requests error with a retry mechanism | ||
rate_limiter: RateLimiter, | ||
} | ||
|
||
impl Dex { | ||
pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self { | ||
pub fn new( | ||
dex: infra::dex::Dex, | ||
config: infra::config::dex::Config, | ||
squadgazzz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
rate_limiter: RateLimiter, | ||
) -> Self { | ||
Self { | ||
dex, | ||
simulator: infra::dex::Simulator::new( | ||
|
@@ -48,6 +62,7 @@ impl Dex { | |
concurrent_requests: config.concurrent_requests, | ||
fills: Fills::new(config.smallest_partial_fill), | ||
risk: config.risk, | ||
rate_limiter, | ||
} | ||
} | ||
|
||
|
@@ -86,42 +101,65 @@ impl Dex { | |
.filter_map(future::ready) | ||
} | ||
|
||
async fn try_solve( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't retry until we get a non-rate-limited result we could still skip orders. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will create a separate issue to consider using a retry mechanism to not overload this PR |
||
&self, | ||
order: &Order, | ||
dex_order: &dex::Order, | ||
tokens: &auction::Tokens, | ||
gas_price: auction::GasPrice, | ||
) -> Option<dex::Swap> { | ||
self.rate_limiter | ||
.execute_with_back_off( | ||
async { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: To avoid unnecessary indentation you can first create the future and then pass it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extracted |
||
let slippage = self.slippage.relative(&dex_order.amount(), tokens); | ||
self.dex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would probably also store the result in a variable to avoid having the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That actually was the initial approach that produced too much redundant code with returning the same |
||
.swap(dex_order, &slippage, tokens, gas_price) | ||
.await | ||
.map_err(|err| { | ||
match &err { | ||
err @ infra::dex::Error::NotFound => { | ||
if order.partially_fillable { | ||
// Only adjust the amount to try next if we are sure the API | ||
// worked | ||
// correctly yet still wasn't able to provide a | ||
// swap. | ||
self.fills.reduce_next_try(order.uid); | ||
} else { | ||
tracing::debug!(?err, "skipping order"); | ||
} | ||
} | ||
err @ infra::dex::Error::OrderNotSupported => { | ||
tracing::debug!(?err, "skipping order") | ||
} | ||
err @ infra::dex::Error::RateLimited => { | ||
tracing::debug!(?err, "encountered rate limit") | ||
} | ||
infra::dex::Error::Other(err) => { | ||
tracing::warn!(?err, "failed to get swap") | ||
} | ||
} | ||
err | ||
}) | ||
}, | ||
|result| matches!(result, Err(infra::dex::Error::RateLimited)), | ||
) | ||
.await | ||
.map_err(|err| match err { | ||
RateLimiterError::RateLimited => infra::dex::Error::RateLimited, | ||
}) | ||
.and_then(|result| result) | ||
.ok() | ||
} | ||
|
||
async fn solve_order( | ||
&self, | ||
order: order::UserOrder<'_>, | ||
tokens: &auction::Tokens, | ||
gas_price: auction::GasPrice, | ||
) -> Option<solution::Solution> { | ||
let order = order.get(); | ||
let swap = { | ||
let order = self.fills.dex_order(order, tokens)?; | ||
let slippage = self.slippage.relative(&order.amount(), tokens); | ||
self.dex.swap(&order, &slippage, tokens, gas_price).await | ||
}; | ||
|
||
let swap = match swap { | ||
Ok(swap) => swap, | ||
Err(err @ infra::dex::Error::NotFound) => { | ||
if order.partially_fillable { | ||
// Only adjust the amount to try next if we are sure the API worked correctly | ||
// yet still wasn't able to provide a swap. | ||
self.fills.reduce_next_try(order.uid); | ||
} else { | ||
tracing::debug!(?err, "skipping order"); | ||
} | ||
return None; | ||
} | ||
Err(err @ infra::dex::Error::OrderNotSupported) => { | ||
tracing::debug!(?err, "skipping order"); | ||
return None; | ||
} | ||
Err(infra::dex::Error::Other(err)) => { | ||
tracing::warn!(?err, "failed to get swap"); | ||
return None; | ||
} | ||
}; | ||
|
||
let uid = order.uid; | ||
let dex_order = self.fills.dex_order(order, tokens)?; | ||
let swap = self.try_solve(order, &dex_order, tokens, gas_price).await?; | ||
let sell = tokens.reference_price(&order.sell.token); | ||
let Some(solution) = swap | ||
.into_solution(order.clone(), gas_price, sell, &self.risk, &self.simulator) | ||
|
@@ -133,7 +171,7 @@ impl Dex { | |
|
||
tracing::debug!("solved"); | ||
// Maybe some liquidity appeared that enables a bigger fill. | ||
self.fills.increase_next_try(uid); | ||
self.fills.increase_next_try(order.uid); | ||
|
||
Some(solution.with_buffers_internalizations(tokens)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required somewhere or just for completeness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required anymore for the latest code revision.