From 575c679f782a99707e6eacd5e098378f08f5dfe0 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Nov 2023 12:55:35 +0000 Subject: [PATCH] DEX solvers rate limiter (#2071) # Description Properly handles `429 Too Many Requests` response from DEX solvers by utilizing `shared::RateLimiter` to back off further requests with a cool-down period. # Changes - A new`RateLimited` variant on solvers::infra::dex::Error - Parse each individual dex solver's HTTP sub-error with status 429 into this error type - Used `shared::rate_limiter::RateLimiter` though `solvers::boundary` - A new config for solvers with `max_retries` for the `solvers` crate ## How to test TBD: is it possible to simulate it in staging? ## Related Issues Fixes #2068 --- crates/shared/src/lib.rs | 2 + crates/shared/src/rate_limiter.rs | 88 +++++++++++++++- crates/solvers/src/boundary/mod.rs | 1 + crates/solvers/src/boundary/rate_limiter.rs | 53 ++++++++++ crates/solvers/src/domain/solver/dex/mod.rs | 100 +++++++++++++------ crates/solvers/src/infra/config/dex/file.rs | 33 +++++- crates/solvers/src/infra/config/dex/mod.rs | 8 +- crates/solvers/src/infra/dex/balancer/mod.rs | 11 +- crates/solvers/src/infra/dex/mod.rs | 5 + crates/solvers/src/infra/dex/oneinch/mod.rs | 9 +- crates/solvers/src/infra/dex/paraswap/mod.rs | 9 +- crates/solvers/src/infra/dex/zeroex/mod.rs | 3 + crates/solvers/src/run.rs | 8 +- 13 files changed, 287 insertions(+), 43 deletions(-) create mode 100644 crates/solvers/src/boundary/rate_limiter.rs diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index d6d96106cf..07d93bdaa8 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -55,6 +55,8 @@ use std::{ time::{Duration, Instant}, }; +pub use rate_limiter::{RateLimiter, RateLimiterError, RateLimitingStrategy}; + /// Run a future and callback with the time the future took. The call back can /// for example log the time. pub async fn measure_time(future: impl Future, timer: impl FnOnce(Duration)) -> T { diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index c280c475c4..703867e6fe 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -174,7 +174,7 @@ impl RateLimiter { } } -#[derive(Error, Debug, Clone, Default)] +#[derive(Error, Debug, Clone, Default, PartialEq)] pub enum RateLimiterError { #[default] #[error("rate limited")] @@ -221,6 +221,30 @@ impl RateLimiter { Ok(result) } + + pub async fn execute_with_back_off( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + if let Some(back_off_duration) = self.get_back_off_duration_if_limited() { + tokio::time::sleep(back_off_duration).await; + } + + self.execute(task, requires_back_off).await + } + + fn get_back_off_duration_if_limited(&self) -> Option { + let strategy = self.strategy.lock().unwrap(); + let now = Instant::now(); + + if strategy.drop_requests_until > now { + let back_off_duration = strategy.drop_requests_until - now; + Some(back_off_duration) + } else { + None + } + } } /// Shared module with common back-off checks. @@ -236,7 +260,7 @@ pub mod back_off { #[cfg(test)] mod tests { - use {super::*, futures::FutureExt, tokio::time::sleep}; + use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep}; #[test] fn current_back_off_does_not_panic() { @@ -317,4 +341,64 @@ mod tests { rate_limiter.strategy().get_current_back_off() ); } + + #[tokio::test] + async fn test_execute_with_no_back_off() { + let timeout = Duration::from_secs(30); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string()); + + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .now_or_never() + .unwrap() + .unwrap(); + + assert_eq!(result, 1); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout)); + } + + let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap(); + assert_eq!(result, 1); + } + + #[tokio::test] + async fn test_execute_with_back_off() { + let timeout = Duration::from_millis(50); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string()); + + // start the back off + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| true) + .await + .unwrap(); + + assert_eq!(result, 1); + let drop_until = { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + let drop_until = current_strategy.drop_requests_until; + assert!(drop_until >= original_drop_until.add(timeout)); + drop_until + }; + + // back off is not over, expecting a RateLimiterError + let result = rate_limiter.execute(async { 1 }, |_| false).await; + assert_eq!(result, Err(RateLimiterError::RateLimited)); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert_eq!(current_strategy.drop_requests_until, drop_until); + } + + // back off is over + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .await + .unwrap(); + assert_eq!(result, 1); + } } diff --git a/crates/solvers/src/boundary/mod.rs b/crates/solvers/src/boundary/mod.rs index 11315b5591..637ef43a1e 100644 --- a/crates/solvers/src/boundary/mod.rs +++ b/crates/solvers/src/boundary/mod.rs @@ -5,5 +5,6 @@ pub mod baseline; pub mod legacy; pub mod liquidity; pub mod naive; +pub mod rate_limiter; pub type Result = anyhow::Result; diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs new file mode 100644 index 0000000000..86ee66e2c1 --- /dev/null +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -0,0 +1,53 @@ +use { + anyhow::Result, + std::{future::Future, time::Duration}, + thiserror::Error, +}; + +pub struct RateLimiter { + inner: shared::RateLimiter, +} + +#[derive(Debug, Clone)] +pub struct RateLimitingStrategy { + inner: shared::RateLimitingStrategy, +} + +impl RateLimitingStrategy { + pub fn try_new( + back_off_growth_factor: f64, + min_back_off: Duration, + max_back_off: Duration, + ) -> Result { + shared::RateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off) + .map(|shared| Self { inner: shared }) + } +} + +#[derive(Error, Debug, Clone, Default)] +pub enum RateLimiterError { + #[default] + #[error("rate limited")] + RateLimited, +} + +impl RateLimiter { + pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { + Self { + inner: shared::RateLimiter::from_strategy(strategy.inner, name), + } + } + + pub async fn execute_with_back_off( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + self.inner + .execute_with_back_off(task, requires_back_off) + .await + .map_err(|err| match err { + shared::RateLimiterError::RateLimited => RateLimiterError::RateLimited, + }) + } +} diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index 65ceb69f69..b27f961337 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -3,8 +3,15 @@ use { crate::{ - domain, - domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills}, + boundary::rate_limiter::{RateLimiter, RateLimiterError}, + domain::{ + self, + auction, + dex::{self, slippage}, + order::{self, Order}, + solution, + solver::dex::fills::Fills, + }, infra, }, futures::{future, stream, FutureExt, StreamExt}, @@ -33,10 +40,14 @@ pub struct Dex { /// Parameters used to calculate the revert risk of a solution. risk: domain::Risk, + + /// Handles 429 Too Many Requests error with a retry mechanism + rate_limiter: RateLimiter, } impl Dex { pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self { + let rate_limiter = RateLimiter::new(config.rate_limiting_strategy, "dex_api".to_string()); Self { dex, simulator: infra::dex::Simulator::new( @@ -48,6 +59,7 @@ impl Dex { concurrent_requests: config.concurrent_requests, fills: Fills::new(config.smallest_partial_fill), risk: config.risk, + rate_limiter, } } @@ -86,42 +98,66 @@ impl Dex { .filter_map(future::ready) } - async fn solve_order( + async fn try_solve( &self, - order: order::UserOrder<'_>, + order: &Order, + dex_order: &dex::Order, tokens: &auction::Tokens, gas_price: auction::GasPrice, - ) -> Option { - let order = order.get(); - let swap = { - let order = self.fills.dex_order(order, tokens)?; - let slippage = self.slippage.relative(&order.amount(), tokens); - self.dex.swap(&order, &slippage, tokens, gas_price).await - }; - - let swap = match swap { - Ok(swap) => swap, - Err(err @ infra::dex::Error::NotFound) => { - if order.partially_fillable { - // Only adjust the amount to try next if we are sure the API worked correctly - // yet still wasn't able to provide a swap. - self.fills.reduce_next_try(order.uid); - } else { - tracing::debug!(?err, "skipping order"); + ) -> Option { + let dex_err_handler = |err: infra::dex::Error| { + match &err { + err @ infra::dex::Error::NotFound => { + if order.partially_fillable { + // Only adjust the amount to try next if we are sure the API + // worked + // correctly yet still wasn't able to provide a + // swap. + self.fills.reduce_next_try(order.uid); + } else { + tracing::debug!(?err, "skipping order"); + } + } + err @ infra::dex::Error::OrderNotSupported => { + tracing::debug!(?err, "skipping order") + } + err @ infra::dex::Error::RateLimited => { + tracing::debug!(?err, "encountered rate limit") + } + infra::dex::Error::Other(err) => { + tracing::warn!(?err, "failed to get swap") } - return None; - } - Err(err @ infra::dex::Error::OrderNotSupported) => { - tracing::debug!(?err, "skipping order"); - return None; - } - Err(infra::dex::Error::Other(err)) => { - tracing::warn!(?err, "failed to get swap"); - return None; } + err }; + let swap = async { + let slippage = self.slippage.relative(&dex_order.amount(), tokens); + self.dex + .swap(dex_order, &slippage, tokens, gas_price) + .await + .map_err(dex_err_handler) + }; + self.rate_limiter + .execute_with_back_off(swap, |result| { + matches!(result, Err(infra::dex::Error::RateLimited)) + }) + .await + .map_err(|err| match err { + RateLimiterError::RateLimited => infra::dex::Error::RateLimited, + }) + .and_then(|result| result) + .ok() + } - let uid = order.uid; + async fn solve_order( + &self, + order: order::UserOrder<'_>, + tokens: &auction::Tokens, + gas_price: auction::GasPrice, + ) -> Option { + let order = order.get(); + let dex_order = self.fills.dex_order(order, tokens)?; + let swap = self.try_solve(order, &dex_order, tokens, gas_price).await?; let sell = tokens.reference_price(&order.sell.token); let Some(solution) = swap .into_solution(order.clone(), gas_price, sell, &self.risk, &self.simulator) @@ -133,7 +169,7 @@ impl Dex { tracing::debug!("solved"); // Maybe some liquidity appeared that enables a bigger fill. - self.fills.increase_next_try(uid); + self.fills.increase_next_try(order.uid); Some(solution.with_buffers_internalizations(tokens)) } diff --git a/crates/solvers/src/infra/config/dex/file.rs b/crates/solvers/src/infra/config/dex/file.rs index 6b101e96b8..9f126618da 100644 --- a/crates/solvers/src/infra/config/dex/file.rs +++ b/crates/solvers/src/infra/config/dex/file.rs @@ -2,6 +2,7 @@ use { crate::{ + boundary::rate_limiter::RateLimitingStrategy, domain::{dex::slippage, eth, Risk}, infra::{blockchain, config::unwrap_or_log, contracts}, util::serialize, @@ -9,7 +10,7 @@ use { bigdecimal::BigDecimal, serde::{de::DeserializeOwned, Deserialize}, serde_with::serde_as, - std::{fmt::Debug, num::NonZeroUsize, path::Path}, + std::{fmt::Debug, num::NonZeroUsize, path::Path, time::Duration}, tokio::fs, }; @@ -48,6 +49,18 @@ struct Config { /// (gas_amount_factor, gas_price_factor, nmb_orders_factor, intercept) risk_parameters: (f64, f64, f64, f64), + /// Back-off growth factor for rate limiting. + #[serde(default = "default_back_off_growth_factor")] + back_off_growth_factor: f64, + + /// Minimum back-off time in seconds for rate limiting. + #[serde(default = "default_min_back_off")] + min_back_off: u64, + + /// Maximum back-off time in seconds for rate limiting. + #[serde(default = "default_max_back_off")] + max_back_off: u64, + /// Settings specific to the wrapped dex API. dex: toml::Value, } @@ -64,6 +77,18 @@ fn default_smallest_partial_fill() -> eth::U256 { eth::U256::exp10(16) // 0.01 ETH } +fn default_back_off_growth_factor() -> f64 { + 1.0 +} + +fn default_min_back_off() -> u64 { + Default::default() +} + +fn default_max_back_off() -> u64 { + Default::default() +} + /// Loads the base solver configuration from a TOML file. /// /// # Panics @@ -117,6 +142,12 @@ pub async fn load(path: &Path) -> (super::Config, T) { nmb_orders_factor: config.risk_parameters.2, intercept: config.risk_parameters.3, }, + rate_limiting_strategy: RateLimitingStrategy::try_new( + config.back_off_growth_factor, + Duration::from_secs(config.min_back_off), + Duration::from_secs(config.max_back_off), + ) + .unwrap(), }; (config, dex) } diff --git a/crates/solvers/src/infra/config/dex/mod.rs b/crates/solvers/src/infra/config/dex/mod.rs index ac8cb2354d..400e92a6b9 100644 --- a/crates/solvers/src/infra/config/dex/mod.rs +++ b/crates/solvers/src/infra/config/dex/mod.rs @@ -5,15 +5,20 @@ pub mod paraswap; pub mod zeroex; use { - crate::domain::{dex::slippage, eth, Risk}, + crate::{ + boundary::rate_limiter::RateLimitingStrategy, + domain::{dex::slippage, eth, Risk}, + }, std::num::NonZeroUsize, }; +#[derive(Clone)] pub struct Contracts { pub settlement: eth::ContractAddress, pub authenticator: eth::ContractAddress, } +#[derive(Clone)] pub struct Config { pub node_url: reqwest::Url, pub contracts: Contracts, @@ -21,4 +26,5 @@ pub struct Config { pub concurrent_requests: NonZeroUsize, pub smallest_partial_fill: eth::Ether, pub risk: Risk, + pub rate_limiting_strategy: RateLimitingStrategy, } diff --git a/crates/solvers/src/infra/dex/balancer/mod.rs b/crates/solvers/src/infra/dex/balancer/mod.rs index 92962269f0..806495da1b 100644 --- a/crates/solvers/src/infra/dex/balancer/mod.rs +++ b/crates/solvers/src/infra/dex/balancer/mod.rs @@ -163,12 +163,21 @@ impl Sor { pub enum Error { #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error(transparent)] Http(util::http::Error), } impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { - Self::Http(err.into()) + match err { + util::http::RoundtripError::Http(util::http::Error::Status(status_code, _)) + if status_code.as_u16() == 429 => + { + Self::RateLimited + } + other_err => Self::Http(other_err.into()), + } } } diff --git a/crates/solvers/src/infra/dex/mod.rs b/crates/solvers/src/infra/dex/mod.rs index 32dacaad4e..4f2a35bb9e 100644 --- a/crates/solvers/src/infra/dex/mod.rs +++ b/crates/solvers/src/infra/dex/mod.rs @@ -46,6 +46,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error(transparent)] Other(Box), } @@ -64,6 +66,7 @@ impl From for Error { match err { oneinch::Error::OrderNotSupported => Self::OrderNotSupported, oneinch::Error::NotFound => Self::NotFound, + oneinch::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -73,6 +76,7 @@ impl From for Error { fn from(err: zeroex::Error) -> Self { match err { zeroex::Error::NotFound => Self::NotFound, + zeroex::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -82,6 +86,7 @@ impl From for Error { fn from(err: paraswap::Error) -> Self { match err { paraswap::Error::NotFound => Self::NotFound, + paraswap::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } diff --git a/crates/solvers/src/infra/dex/oneinch/mod.rs b/crates/solvers/src/infra/dex/oneinch/mod.rs index 041ac4b38f..6ef480a810 100644 --- a/crates/solvers/src/infra/dex/oneinch/mod.rs +++ b/crates/solvers/src/infra/dex/oneinch/mod.rs @@ -165,6 +165,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error("api error {code}: {description}")] Api { code: i32, description: String }, #[error(transparent)] @@ -174,7 +176,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => { // Unfortunately, AFAIK these codes aren't documented anywhere. These // based on empirical observations of what the API has returned in the diff --git a/crates/solvers/src/infra/dex/paraswap/mod.rs b/crates/solvers/src/infra/dex/paraswap/mod.rs index 621dafd984..d4e87310e6 100644 --- a/crates/solvers/src/infra/dex/paraswap/mod.rs +++ b/crates/solvers/src/infra/dex/paraswap/mod.rs @@ -113,6 +113,8 @@ pub enum Error { NotFound, #[error("decimals are missing for the swapped tokens")] MissingDecimals, + #[error("rate limited")] + RateLimited, #[error("api error {0}")] Api(String), #[error(transparent)] @@ -122,7 +124,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => match err.error.as_str() { "ESTIMATED_LOSS_GREATER_THAN_MAX_IMPACT" | "No routes found with enough liquidity" diff --git a/crates/solvers/src/infra/dex/zeroex/mod.rs b/crates/solvers/src/infra/dex/zeroex/mod.rs index ac7929bfe2..43834cc24a 100644 --- a/crates/solvers/src/infra/dex/zeroex/mod.rs +++ b/crates/solvers/src/infra/dex/zeroex/mod.rs @@ -150,6 +150,8 @@ pub enum Error { NotFound, #[error("quote does not specify an approval spender")] MissingSpender, + #[error("rate limited")] + RateLimited, #[error("api error code {code}: {reason}")] Api { code: i64, reason: String }, #[error(transparent)] @@ -166,6 +168,7 @@ impl From> for Error { // past. match err.code { 100 => Self::NotFound, + 429 => Self::RateLimited, _ => Self::Api { code: err.code, reason: err.reason, diff --git a/crates/solvers/src/run.rs b/crates/solvers/src/run.rs index 9eee0b2ac1..2fc2e0784d 100644 --- a/crates/solvers/src/run.rs +++ b/crates/solvers/src/run.rs @@ -47,28 +47,28 @@ async fn run_with(args: cli::Args, bind: Option>) { dex::Dex::ZeroEx( dex::zeroex::ZeroEx::new(config.zeroex).expect("invalid 0x configuration"), ), - config.base, + config.base.clone(), )) } cli::Command::Balancer { config } => { let config = config::dex::balancer::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::Balancer(dex::balancer::Sor::new(config.sor)), - config.base, + config.base.clone(), )) } cli::Command::OneInch { config } => { let config = config::dex::oneinch::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::OneInch(dex::oneinch::OneInch::new(config.oneinch).await.unwrap()), - config.base, + config.base.clone(), )) } cli::Command::ParaSwap { config } => { let config = config::dex::paraswap::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::ParaSwap(dex::paraswap::ParaSwap::new(config.paraswap)), - config.base, + config.base.clone(), )) } };