Skip to content

Commit

Permalink
DEX solvers rate limiter (#2071)
Browse files Browse the repository at this point in the history
# Description
Properly handles `429 Too Many Requests` response from DEX solvers by
utilizing `shared::RateLimiter` to back off further requests with a
cool-down period.

# Changes

- A new`RateLimited` variant on solvers::infra::dex::Error
- Parse each individual dex solver's HTTP sub-error with status 429 into
this error type
- Used `shared::rate_limiter::RateLimiter` though `solvers::boundary`
- A new config for solvers with `max_retries` for the `solvers` crate

## How to test
TBD: is it possible to simulate it in staging?

## Related Issues
Fixes #2068
  • Loading branch information
squadgazzz authored Nov 30, 2023
1 parent a72c4c3 commit 575c679
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 43 deletions.
2 changes: 2 additions & 0 deletions crates/shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use std::{
time::{Duration, Instant},
};

pub use rate_limiter::{RateLimiter, RateLimiterError, RateLimitingStrategy};

/// Run a future and callback with the time the future took. The call back can
/// for example log the time.
pub async fn measure_time<T>(future: impl Future<Output = T>, timer: impl FnOnce(Duration)) -> T {
Expand Down
88 changes: 86 additions & 2 deletions crates/shared/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl RateLimiter {
}
}

#[derive(Error, Debug, Clone, Default)]
#[derive(Error, Debug, Clone, Default, PartialEq)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
Expand Down Expand Up @@ -221,6 +221,30 @@ impl RateLimiter {

Ok(result)
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
if let Some(back_off_duration) = self.get_back_off_duration_if_limited() {
tokio::time::sleep(back_off_duration).await;
}

self.execute(task, requires_back_off).await
}

fn get_back_off_duration_if_limited(&self) -> Option<Duration> {
let strategy = self.strategy.lock().unwrap();
let now = Instant::now();

if strategy.drop_requests_until > now {
let back_off_duration = strategy.drop_requests_until - now;
Some(back_off_duration)
} else {
None
}
}
}

/// Shared module with common back-off checks.
Expand All @@ -236,7 +260,7 @@ pub mod back_off {

#[cfg(test)]
mod tests {
use {super::*, futures::FutureExt, tokio::time::sleep};
use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep};

#[test]
fn current_back_off_does_not_panic() {
Expand Down Expand Up @@ -317,4 +341,64 @@ mod tests {
rate_limiter.strategy().get_current_back_off()
);
}

#[tokio::test]
async fn test_execute_with_no_back_off() {
let timeout = Duration::from_secs(30);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.now_or_never()
.unwrap()
.unwrap();

assert_eq!(result, 1);
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout));
}

let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap();
assert_eq!(result, 1);
}

#[tokio::test]
async fn test_execute_with_back_off() {
let timeout = Duration::from_millis(50);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string());

// start the back off
let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| true)
.await
.unwrap();

assert_eq!(result, 1);
let drop_until = {
let current_strategy = rate_limiter.strategy.lock().unwrap();
let drop_until = current_strategy.drop_requests_until;
assert!(drop_until >= original_drop_until.add(timeout));
drop_until
};

// back off is not over, expecting a RateLimiterError
let result = rate_limiter.execute(async { 1 }, |_| false).await;
assert_eq!(result, Err(RateLimiterError::RateLimited));
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert_eq!(current_strategy.drop_requests_until, drop_until);
}

// back off is over
let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();
assert_eq!(result, 1);
}
}
1 change: 1 addition & 0 deletions crates/solvers/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pub mod baseline;
pub mod legacy;
pub mod liquidity;
pub mod naive;
pub mod rate_limiter;

pub type Result<T> = anyhow::Result<T>;
53 changes: 53 additions & 0 deletions crates/solvers/src/boundary/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use {
anyhow::Result,
std::{future::Future, time::Duration},
thiserror::Error,
};

pub struct RateLimiter {
inner: shared::RateLimiter,
}

#[derive(Debug, Clone)]
pub struct RateLimitingStrategy {
inner: shared::RateLimitingStrategy,
}

impl RateLimitingStrategy {
pub fn try_new(
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
) -> Result<Self> {
shared::RateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off)
.map(|shared| Self { inner: shared })
}
}

#[derive(Error, Debug, Clone, Default)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
RateLimited,
}

impl RateLimiter {
pub fn new(strategy: RateLimitingStrategy, name: String) -> Self {
Self {
inner: shared::RateLimiter::from_strategy(strategy.inner, name),
}
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
self.inner
.execute_with_back_off(task, requires_back_off)
.await
.map_err(|err| match err {
shared::RateLimiterError::RateLimited => RateLimiterError::RateLimited,
})
}
}
100 changes: 68 additions & 32 deletions crates/solvers/src/domain/solver/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
use {
crate::{
domain,
domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills},
boundary::rate_limiter::{RateLimiter, RateLimiterError},
domain::{
self,
auction,
dex::{self, slippage},
order::{self, Order},
solution,
solver::dex::fills::Fills,
},
infra,
},
futures::{future, stream, FutureExt, StreamExt},
Expand Down Expand Up @@ -33,10 +40,14 @@ pub struct Dex {

/// Parameters used to calculate the revert risk of a solution.
risk: domain::Risk,

/// Handles 429 Too Many Requests error with a retry mechanism
rate_limiter: RateLimiter,
}

impl Dex {
pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self {
let rate_limiter = RateLimiter::new(config.rate_limiting_strategy, "dex_api".to_string());
Self {
dex,
simulator: infra::dex::Simulator::new(
Expand All @@ -48,6 +59,7 @@ impl Dex {
concurrent_requests: config.concurrent_requests,
fills: Fills::new(config.smallest_partial_fill),
risk: config.risk,
rate_limiter,
}
}

Expand Down Expand Up @@ -86,42 +98,66 @@ impl Dex {
.filter_map(future::ready)
}

async fn solve_order(
async fn try_solve(
&self,
order: order::UserOrder<'_>,
order: &Order,
dex_order: &dex::Order,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Option<solution::Solution> {
let order = order.get();
let swap = {
let order = self.fills.dex_order(order, tokens)?;
let slippage = self.slippage.relative(&order.amount(), tokens);
self.dex.swap(&order, &slippage, tokens, gas_price).await
};

let swap = match swap {
Ok(swap) => swap,
Err(err @ infra::dex::Error::NotFound) => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API worked correctly
// yet still wasn't able to provide a swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
) -> Option<dex::Swap> {
let dex_err_handler = |err: infra::dex::Error| {
match &err {
err @ infra::dex::Error::NotFound => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API
// worked
// correctly yet still wasn't able to provide a
// swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
}
}
err @ infra::dex::Error::OrderNotSupported => {
tracing::debug!(?err, "skipping order")
}
err @ infra::dex::Error::RateLimited => {
tracing::debug!(?err, "encountered rate limit")
}
infra::dex::Error::Other(err) => {
tracing::warn!(?err, "failed to get swap")
}
return None;
}
Err(err @ infra::dex::Error::OrderNotSupported) => {
tracing::debug!(?err, "skipping order");
return None;
}
Err(infra::dex::Error::Other(err)) => {
tracing::warn!(?err, "failed to get swap");
return None;
}
err
};
let swap = async {
let slippage = self.slippage.relative(&dex_order.amount(), tokens);
self.dex
.swap(dex_order, &slippage, tokens, gas_price)
.await
.map_err(dex_err_handler)
};
self.rate_limiter
.execute_with_back_off(swap, |result| {
matches!(result, Err(infra::dex::Error::RateLimited))
})
.await
.map_err(|err| match err {
RateLimiterError::RateLimited => infra::dex::Error::RateLimited,
})
.and_then(|result| result)
.ok()
}

let uid = order.uid;
async fn solve_order(
&self,
order: order::UserOrder<'_>,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Option<solution::Solution> {
let order = order.get();
let dex_order = self.fills.dex_order(order, tokens)?;
let swap = self.try_solve(order, &dex_order, tokens, gas_price).await?;
let sell = tokens.reference_price(&order.sell.token);
let Some(solution) = swap
.into_solution(order.clone(), gas_price, sell, &self.risk, &self.simulator)
Expand All @@ -133,7 +169,7 @@ impl Dex {

tracing::debug!("solved");
// Maybe some liquidity appeared that enables a bigger fill.
self.fills.increase_next_try(uid);
self.fills.increase_next_try(order.uid);

Some(solution.with_buffers_internalizations(tokens))
}
Expand Down
33 changes: 32 additions & 1 deletion crates/solvers/src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
use {
crate::{
boundary::rate_limiter::RateLimitingStrategy,
domain::{dex::slippage, eth, Risk},
infra::{blockchain, config::unwrap_or_log, contracts},
util::serialize,
},
bigdecimal::BigDecimal,
serde::{de::DeserializeOwned, Deserialize},
serde_with::serde_as,
std::{fmt::Debug, num::NonZeroUsize, path::Path},
std::{fmt::Debug, num::NonZeroUsize, path::Path, time::Duration},
tokio::fs,
};

Expand Down Expand Up @@ -48,6 +49,18 @@ struct Config {
/// (gas_amount_factor, gas_price_factor, nmb_orders_factor, intercept)
risk_parameters: (f64, f64, f64, f64),

/// Back-off growth factor for rate limiting.
#[serde(default = "default_back_off_growth_factor")]
back_off_growth_factor: f64,

/// Minimum back-off time in seconds for rate limiting.
#[serde(default = "default_min_back_off")]
min_back_off: u64,

/// Maximum back-off time in seconds for rate limiting.
#[serde(default = "default_max_back_off")]
max_back_off: u64,

/// Settings specific to the wrapped dex API.
dex: toml::Value,
}
Expand All @@ -64,6 +77,18 @@ fn default_smallest_partial_fill() -> eth::U256 {
eth::U256::exp10(16) // 0.01 ETH
}

fn default_back_off_growth_factor() -> f64 {
1.0
}

fn default_min_back_off() -> u64 {
Default::default()
}

fn default_max_back_off() -> u64 {
Default::default()
}

/// Loads the base solver configuration from a TOML file.
///
/// # Panics
Expand Down Expand Up @@ -117,6 +142,12 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
nmb_orders_factor: config.risk_parameters.2,
intercept: config.risk_parameters.3,
},
rate_limiting_strategy: RateLimitingStrategy::try_new(
config.back_off_growth_factor,
Duration::from_secs(config.min_back_off),
Duration::from_secs(config.max_back_off),
)
.unwrap(),
};
(config, dex)
}
Loading

0 comments on commit 575c679

Please sign in to comment.