From c48bfe01cd08d71f2c151b5ca26df8b338079d57 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 9 Aug 2024 11:53:51 +0200 Subject: [PATCH 1/6] Tokio console opt-in via env variable (#2878) # Description The `tokio-console` comes with some pretty hefty memory overhead that caused many pods to crashloop due to OOM issues which I didn't expect. Therefore we should not have it enabled by default. However, I still think it's valuable to have support for the feature baked in by default to reduce the friction of actually using the feature. # Changes Now you have to set `TOKIO_CONSOLE=true` to enable it. This flag does not show up in the `--help` text and only in the repo `README.md` because otherwise it would have to be added a bit awkwardly and passed around a lot which I didn't really like in the previous iteration to begin with. I think this is okay because it's just a debugging thing and not knowing about the flag doesn't prevent you from using the binaries correctly but LMK if you disagree and I should parse the arg via `clap` instead. ## How to test Ran a local test with and without `TOKIO_CONSOLE=true` --- README.md | 8 ++++---- crates/observe/src/tracing.rs | 9 ++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index a379927358..9ec3f546e1 100644 --- a/README.md +++ b/README.md @@ -119,11 +119,11 @@ ANVIL_IP_ADDR=0.0.0.0 anvil \ ### Profiling -The most important binaries support [tokio-console](https://github.com/tokio-rs/console) to allow you a could look inside the tokio runtime. +All binaries are compiled with support for [tokio-console](https://github.com/tokio-rs/console) by default to allow you to look inside the tokio runtime. +However, this feature is not enabled at runtime by default because it comes with a pretty significant memory overhead. To enable it you just have to set the environment variable `TOKIO_CONSOLE=true` and run the binary you want to instrument. -Simply enable the feature by passing `--enable-tokio-console true` when running a binary and then in another shell, run - -``` +You can install and run `tokio-console` with: +```bash cargo install --locked tokio-console tokio-console ``` diff --git a/crates/observe/src/tracing.rs b/crates/observe/src/tracing.rs index 6efe47f414..3e908c0676 100644 --- a/crates/observe/src/tracing.rs +++ b/crates/observe/src/tracing.rs @@ -70,7 +70,12 @@ fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { }}; } - if cfg!(tokio_unstable) { + let enable_tokio_console: bool = std::env::var("TOKIO_CONSOLE") + .unwrap_or("false".to_string()) + .parse() + .unwrap(); + + if cfg!(tokio_unstable) && enable_tokio_console { let (env_filter, reload_handle) = tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); @@ -78,6 +83,7 @@ fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { .with(console_subscriber::spawn()) .with(fmt_layer!(env_filter, stderr_threshold)) .init(); + tracing::info!("started programm with support for tokio-console"); if cfg!(unix) { spawn_reload_handler(initial_filter, reload_handle); @@ -92,6 +98,7 @@ fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { .with(tracing::level_filters::LevelFilter::TRACE) .with(fmt_layer!(env_filter, stderr_threshold)) .init(); + tracing::info!("started programm without support for tokio-console"); if cfg!(unix) { spawn_reload_handler(initial_filter, reload_handle); From ca9fc436997fa1d9dbe56e8f60ea45e3fa0f1c97 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Fri, 9 Aug 2024 15:09:36 +0200 Subject: [PATCH 2/6] Allow for no liquidity limit order placement (#2854) # Description We currently require all orders to have quote in order to accurately compute fee policies (price improvement policies requires quote data). In the absence of the quote we don't allow order placement and exclude the order from solvable orders (inconsistently on a one-off code path outside of the solvable orders cache where it doesn't get flagged as "filtered" or "invalid" thus being silently ignored). This seems unnecessarily restrictive. In the absence of a quote we could simply assume that the limit price itself was the quote (ie apply a pure surplus fee policy) to the order. This would allow placing limit orders for tokens which are not yet tradables but will become tradable soon. @sunce86 am I missing any other reason why we need a quote? # Changes - [x] Allow order placement if a quote wasn't found due to a pricing error - [x] Don't filter out orders from the auction for which we don't have a quote - [x] Assume the limit amounts are equal to the quote amounts for fee policy application in case we cannot find a quote ## How to test Introduced an e2e test showing that this works --- crates/autopilot/src/domain/fee/mod.rs | 17 ++- crates/autopilot/src/solvable_orders.rs | 17 ++- crates/e2e/src/setup/colocation.rs | 2 +- crates/e2e/src/setup/fee.rs | 75 +++++++++++++ crates/e2e/src/setup/mod.rs | 1 + crates/e2e/src/setup/onchain_components.rs | 20 +++- crates/e2e/tests/e2e/limit_orders.rs | 116 +++++++++++++++++++++ crates/e2e/tests/e2e/protocol_fee.rs | 83 ++------------- crates/shared/src/order_validation.rs | 53 ++++++---- 9 files changed, 270 insertions(+), 114 deletions(-) create mode 100644 crates/e2e/src/setup/fee.rs diff --git a/crates/autopilot/src/domain/fee/mod.rs b/crates/autopilot/src/domain/fee/mod.rs index 1d3644db72..409d56e374 100644 --- a/crates/autopilot/src/domain/fee/mod.rs +++ b/crates/autopilot/src/domain/fee/mod.rs @@ -20,6 +20,7 @@ use { std::{collections::HashSet, str::FromStr}, }; +#[derive(Debug)] enum OrderClass { Market, Limit, @@ -81,7 +82,7 @@ impl ProtocolFees { pub fn apply( &self, order: boundary::Order, - quote: &domain::Quote, + quote: Option, surplus_capturing_jit_order_owners: &[eth::Address], ) -> domain::Order { let partner_fee = order @@ -114,6 +115,16 @@ impl ProtocolFees { buy: order.data.buy_amount, fee: order.data.fee_amount, }; + + // In case there is no quote, we assume 0 buy amount so that the order ends up + // being considered out of market price. + let quote = quote.unwrap_or(domain::Quote { + order_uid: order.metadata.uid.into(), + sell_amount: order.data.sell_amount.into(), + buy_amount: U256::zero().into(), + fee: order.data.fee_amount.into(), + }); + let quote_ = boundary::Amounts { sell: quote.sell_amount.into(), buy: quote.buy_amount.into(), @@ -121,9 +132,9 @@ impl ProtocolFees { }; if self.enable_protocol_fees { - self.apply_multiple_policies(order, quote, order_, quote_, partner_fee) + self.apply_multiple_policies(order, "e, order_, quote_, partner_fee) } else { - self.apply_single_policy(order, quote, order_, quote_, partner_fee) + self.apply_single_policy(order, "e, order_, quote_, partner_fee) } } diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index 1bca766748..0e0741114e 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -267,17 +267,16 @@ impl SolvableOrdersCache { latest_settlement_block: db_solvable_orders.latest_settlement_block, orders: orders .into_iter() - .filter_map(|order| { - if let Some(quote) = db_solvable_orders.quotes.get(&order.metadata.uid.into()) { - Some(self.protocol_fees.apply(order, quote, &surplus_capturing_jit_order_owners)) - } else { - tracing::warn!(order_uid = %order.metadata.uid, "order is skipped, quote is missing"); - None - } + .map(|order| { + let quote = db_solvable_orders + .quotes + .get(&order.metadata.uid.into()) + .cloned(); + self.protocol_fees + .apply(order, quote, &surplus_capturing_jit_order_owners) }) .collect(), - prices: - prices + prices: prices .into_iter() .map(|(key, value)| { Price::new(value.into()).map(|price| (eth::TokenAddress(key), price)) diff --git a/crates/e2e/src/setup/colocation.rs b/crates/e2e/src/setup/colocation.rs index abba3a108f..06814ae466 100644 --- a/crates/e2e/src/setup/colocation.rs +++ b/crates/e2e/src/setup/colocation.rs @@ -56,7 +56,7 @@ impl LiquidityProvider { [[liquidity.uniswap-v2]] router = "{:?}" pool-code = "{:?}" -missing-pool-cache-time = "1h" +missing-pool-cache-time = "0s" "#, contracts.uniswap_v2_router.address(), contracts.default_pool_code() diff --git a/crates/e2e/src/setup/fee.rs b/crates/e2e/src/setup/fee.rs new file mode 100644 index 0000000000..003b7c2038 --- /dev/null +++ b/crates/e2e/src/setup/fee.rs @@ -0,0 +1,75 @@ +pub struct ProtocolFeesConfig(pub Vec); + +#[derive(Clone)] +pub struct ProtocolFee { + pub policy: FeePolicyKind, + pub policy_order_class: FeePolicyOrderClass, +} + +#[derive(Clone)] +pub enum FeePolicyOrderClass { + Market, + Limit, + Any, +} + +impl std::fmt::Display for FeePolicyOrderClass { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FeePolicyOrderClass::Market => write!(f, "market"), + FeePolicyOrderClass::Limit => write!(f, "limit"), + FeePolicyOrderClass::Any => write!(f, "any"), + } + } +} + +#[derive(Clone)] +pub enum FeePolicyKind { + /// How much of the order's surplus should be taken as a protocol fee. + Surplus { factor: f64, max_volume_factor: f64 }, + /// How much of the order's volume should be taken as a protocol fee. + Volume { factor: f64 }, + /// How much of the order's price improvement should be taken as a protocol + /// fee where price improvement is a difference between the executed price + /// and the best quote. + PriceImprovement { factor: f64, max_volume_factor: f64 }, +} + +impl std::fmt::Display for ProtocolFee { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let order_class_str = &self.policy_order_class.to_string(); + match &self.policy { + FeePolicyKind::Surplus { + factor, + max_volume_factor, + } => write!( + f, + "surplus:{}:{}:{}", + factor, max_volume_factor, order_class_str + ), + FeePolicyKind::Volume { factor } => { + write!(f, "volume:{}:{}", factor, order_class_str) + } + FeePolicyKind::PriceImprovement { + factor, + max_volume_factor, + } => write!( + f, + "priceImprovement:{}:{}:{}", + factor, max_volume_factor, order_class_str + ), + } + } +} + +impl std::fmt::Display for ProtocolFeesConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let fees_str = self + .0 + .iter() + .map(|fee| fee.to_string()) + .collect::>() + .join(","); + write!(f, "--fee-policies={}", fees_str) + } +} diff --git a/crates/e2e/src/setup/mod.rs b/crates/e2e/src/setup/mod.rs index 3636a371e3..70b7399397 100644 --- a/crates/e2e/src/setup/mod.rs +++ b/crates/e2e/src/setup/mod.rs @@ -2,6 +2,7 @@ pub mod colocation; mod deploy; #[macro_use] pub mod onchain_components; +pub mod fee; mod services; mod solver; diff --git a/crates/e2e/src/setup/onchain_components.rs b/crates/e2e/src/setup/onchain_components.rs index 8fa62f26d4..2255ed60c6 100644 --- a/crates/e2e/src/setup/onchain_components.rs +++ b/crates/e2e/src/setup/onchain_components.rs @@ -303,10 +303,12 @@ impl OnchainComponents { solvers } - async fn deploy_tokens(&self, minter: Account) -> [MintableToken; N] { + /// Deploy `N` tokens without any onchain liquidity + pub async fn deploy_tokens(&self, minter: &Account) -> [MintableToken; N] { let mut res = Vec::with_capacity(N); for _ in 0..N { let contract = ERC20Mintable::builder(&self.web3) + .from(minter.clone()) .deploy() .await .expect("MintableERC20 deployment failed"); @@ -333,9 +335,19 @@ impl OnchainComponents { .expect("getting accounts failed")[0], None, ); - let tokens = self.deploy_tokens::(minter).await; + let tokens = self.deploy_tokens::(&minter).await; + self.seed_weth_uni_v2_pools(tokens.iter(), token_amount, weth_amount) + .await; + tokens + } - for MintableToken { contract, minter } in &tokens { + pub async fn seed_weth_uni_v2_pools( + &self, + tokens: impl IntoIterator, + token_amount: U256, + weth_amount: U256, + ) { + for MintableToken { contract, minter } in tokens { tx!(minter, contract.mint(minter.address(), token_amount)); tx_value!(minter, weth_amount, self.contracts.weth.deposit()); @@ -369,8 +381,6 @@ impl OnchainComponents { ) ); } - - tokens } /// Mints `amount` tokens to its `token`-WETH Uniswap V2 pool. diff --git a/crates/e2e/tests/e2e/limit_orders.rs b/crates/e2e/tests/e2e/limit_orders.rs index 5112369cc2..f5cd73e693 100644 --- a/crates/e2e/tests/e2e/limit_orders.rs +++ b/crates/e2e/tests/e2e/limit_orders.rs @@ -3,6 +3,7 @@ use { driver::domain::eth::NonZeroU256, e2e::{nodes::forked_node::ForkedNodeApi, setup::*, tx}, ethcontract::{prelude::U256, H160}, + fee::{FeePolicyOrderClass, ProtocolFee, ProtocolFeesConfig}, model::{ order::{OrderClass, OrderCreation, OrderKind}, quote::{OrderQuoteRequest, OrderQuoteSide, SellAmount}, @@ -37,6 +38,12 @@ async fn local_node_limit_does_not_apply_to_in_market_orders_test() { run_test(limit_does_not_apply_to_in_market_orders_test).await; } +#[tokio::test] +#[ignore] +async fn local_node_no_liquidity_limit_order() { + run_test(no_liquidity_limit_order).await; +} + /// The block number from which we will fetch state for the forked tests. const FORK_BLOCK_MAINNET: u64 = 18477910; /// USDC whale address as per [FORK_BLOCK_MAINNET]. @@ -695,3 +702,112 @@ async fn forked_gnosis_single_limit_order_test(web3: Web3) { assert!(sell_token_balance_before > sell_token_balance_after); assert!(buy_token_balance_after >= buy_token_balance_before + to_wei(500)); } + +async fn no_liquidity_limit_order(web3: Web3) { + let mut onchain = OnchainComponents::deploy(web3.clone()).await; + + let [solver] = onchain.make_solvers(to_wei(10_000)).await; + let [trader_a] = onchain.make_accounts(to_wei(1)).await; + let [token_a] = onchain.deploy_tokens(solver.account()).await; + + // Fund trader accounts + token_a.mint(trader_a.address(), to_wei(10)).await; + + // Approve GPv2 for trading + tx!( + trader_a.account(), + token_a.approve(onchain.contracts().allowance, to_wei(10)) + ); + + // Setup services + let protocol_fees_config = ProtocolFeesConfig(vec![ + ProtocolFee { + policy: fee::FeePolicyKind::Surplus { + factor: 0.5, + max_volume_factor: 0.01, + }, + policy_order_class: FeePolicyOrderClass::Limit, + }, + ProtocolFee { + policy: fee::FeePolicyKind::PriceImprovement { + factor: 0.5, + max_volume_factor: 0.01, + }, + policy_order_class: FeePolicyOrderClass::Market, + }, + ]) + .to_string(); + + let services = Services::new(onchain.contracts()).await; + services + .start_protocol_with_args( + ExtraServiceArgs { + autopilot: vec![ + protocol_fees_config, + "--enable-multiple-fees=true".to_string(), + ], + ..Default::default() + }, + solver, + ) + .await; + + // Place order + let order = OrderCreation { + sell_token: token_a.address(), + sell_amount: to_wei(10), + buy_token: onchain.contracts().weth.address(), + buy_amount: to_wei(1), + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Sell, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + SecretKeyRef::from(&SecretKey::from_slice(trader_a.private_key()).unwrap()), + ); + let order_id = services.create_order(&order).await.unwrap(); + let limit_order = services.get_order(&order_id).await.unwrap(); + assert_eq!(limit_order.metadata.class, OrderClass::Limit); + + // Create liquidity + onchain + .seed_weth_uni_v2_pools([&token_a].iter().copied(), to_wei(1000), to_wei(1000)) + .await; + + // Drive solution + tracing::info!("Waiting for trade."); + let balance_before = onchain + .contracts() + .weth + .balance_of(trader_a.address()) + .call() + .await + .unwrap(); + wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 1 }) + .await + .unwrap(); + + wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 0 }) + .await + .unwrap(); + + let balance_after = onchain + .contracts() + .weth + .balance_of(trader_a.address()) + .call() + .await + .unwrap(); + assert!(balance_after.checked_sub(balance_before).unwrap() >= to_wei(5)); + + let trades = services.get_trades(&order_id).await.unwrap(); + assert_eq!( + trades.first().unwrap().fee_policies, + vec![model::fee_policy::FeePolicy::Surplus { + factor: 0.5, + max_volume_factor: 0.01 + }], + ); +} diff --git a/crates/e2e/tests/e2e/protocol_fee.rs b/crates/e2e/tests/e2e/protocol_fee.rs index 0a68767c74..8dc5f2095a 100644 --- a/crates/e2e/tests/e2e/protocol_fee.rs +++ b/crates/e2e/tests/e2e/protocol_fee.rs @@ -1,6 +1,11 @@ use { driver::domain::eth::NonZeroU256, - e2e::{assert_approximately_eq, setup::*, tx, tx_value}, + e2e::{ + assert_approximately_eq, + setup::{fee::*, *}, + tx, + tx_value, + }, ethcontract::{prelude::U256, Address}, model::{ order::{Order, OrderCreation, OrderCreationAppData, OrderKind}, @@ -535,79 +540,3 @@ async fn volume_fee_buy_order_test(web3: Web3) { .unwrap(); assert_eq!(order.metadata.executed_surplus_fee, balance_after); } - -struct ProtocolFeesConfig(Vec); - -#[derive(Clone)] -struct ProtocolFee { - policy: FeePolicyKind, - policy_order_class: FeePolicyOrderClass, -} - -#[derive(Clone)] -enum FeePolicyOrderClass { - Market, - Limit, - Any, -} - -impl std::fmt::Display for FeePolicyOrderClass { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - FeePolicyOrderClass::Market => write!(f, "market"), - FeePolicyOrderClass::Limit => write!(f, "limit"), - FeePolicyOrderClass::Any => write!(f, "any"), - } - } -} - -#[derive(Clone)] -enum FeePolicyKind { - /// How much of the order's surplus should be taken as a protocol fee. - Surplus { factor: f64, max_volume_factor: f64 }, - /// How much of the order's volume should be taken as a protocol fee. - Volume { factor: f64 }, - /// How much of the order's price improvement should be taken as a protocol - /// fee where price improvement is a difference between the executed price - /// and the best quote. - PriceImprovement { factor: f64, max_volume_factor: f64 }, -} - -impl std::fmt::Display for ProtocolFee { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let order_class_str = &self.policy_order_class.to_string(); - match &self.policy { - FeePolicyKind::Surplus { - factor, - max_volume_factor, - } => write!( - f, - "surplus:{}:{}:{}", - factor, max_volume_factor, order_class_str - ), - FeePolicyKind::Volume { factor } => { - write!(f, "volume:{}:{}", factor, order_class_str) - } - FeePolicyKind::PriceImprovement { - factor, - max_volume_factor, - } => write!( - f, - "priceImprovement:{}:{}:{}", - factor, max_volume_factor, order_class_str - ), - } - } -} - -impl std::fmt::Display for ProtocolFeesConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let fees_str = self - .0 - .iter() - .map(|fee| fee.to_string()) - .collect::>() - .join(","); - write!(f, "--fee-policies={}", fees_str) - } -} diff --git a/crates/shared/src/order_validation.rs b/crates/shared/src/order_validation.rs index 886ecbc1e2..0ea45eb55b 100644 --- a/crates/shared/src/order_validation.rs +++ b/crates/shared/src/order_validation.rs @@ -665,26 +665,41 @@ impl OrderValidating for OrderValidator { } } OrderClass::Limit => { - let quote = - get_quote_and_check_fee(&*self.quoter, "e_parameters, order.quote_id, None) - .await?; - // If the order is not "In-Market", check for the limit orders - if is_order_outside_market_price( - &Amounts { - sell: data.sell_amount, - buy: data.buy_amount, - fee: data.fee_amount, - }, - &Amounts { - sell: quote.sell_amount, - buy: quote.buy_amount, - fee: quote.fee_amount, - }, - data.kind, - ) { - self.check_max_limit_orders(owner).await?; + match get_quote_and_check_fee( + &*self.quoter, + "e_parameters, + order.quote_id, + None, + ) + .await + { + Ok(quote) => { + // If the order is not "In-Market", check for the limit orders + if is_order_outside_market_price( + &Amounts { + sell: data.sell_amount, + buy: data.buy_amount, + fee: data.fee_amount, + }, + &Amounts { + sell: quote.sell_amount, + buy: quote.buy_amount, + fee: quote.fee_amount, + }, + data.kind, + ) { + self.check_max_limit_orders(owner).await?; + } + (class, Some(quote)) + } + // If the quote cannot be computed, it's still possible to place this order (as + // an implicit out of market order) + Err(ValidationError::PriceForQuote(err)) => { + tracing::debug!(?err, "placing order without quote"); + (class, None) + } + Err(other) => return Err(other), } - (class, Some(quote)) } OrderClass::Liquidity => { let quote = From 48de0ca3596eb26d8737a7393568316a6197ec61 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Mon, 12 Aug 2024 16:03:19 +0200 Subject: [PATCH 3/6] Few improvements for request sharing (#2880) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description This PR ended up being a small collection of improvements all related to request sharing in the hopes of improving the memory leak situation. # Changes - use `shared_or_else()` instead of `shared()` to delay the construction of shared futures as much as possible (might result in a reduction of the leaked memory 🤞) - finally the unit test is able to assert that the original future and shared cached future indeed point to the same underlying future - added a metric to count all cached items across all individual request sharing structs Either this shows a constant increase which suggests that we leak memory by not cleaning up the map (and keeping strong references to the shared futures) Or we do everything right and the memory somehow gets leaked in the shared future destructor ## How to test functionality didn't really change so existing tests should still pass --- .../src/price_estimation/trade_finder.rs | 15 ++- crates/shared/src/request_sharing.rs | 39 +++---- crates/shared/src/trade_finding/external.rs | 108 +++++++++--------- 3 files changed, 81 insertions(+), 81 deletions(-) diff --git a/crates/shared/src/price_estimation/trade_finder.rs b/crates/shared/src/price_estimation/trade_finder.rs index eb47c31d5b..c4efe13e52 100644 --- a/crates/shared/src/price_estimation/trade_finder.rs +++ b/crates/shared/src/price_estimation/trade_finder.rs @@ -61,11 +61,14 @@ impl TradeEstimator { } async fn estimate(&self, query: Arc) -> Result { - let estimate = rate_limited( - self.rate_limiter.clone(), - self.inner.clone().estimate(query.clone()), - ); - self.sharing.shared(query, estimate.boxed()).await + let fut = move |query: &Arc| { + rate_limited( + self.rate_limiter.clone(), + self.inner.clone().estimate(query.clone()), + ) + .boxed() + }; + self.sharing.shared_or_else(query, fut).await } } @@ -84,7 +87,7 @@ impl Inner { }; return verifier - .verify(&price_query, &query.verification, trade.clone()) + .verify(&price_query, &query.verification, trade) .await .map_err(PriceEstimationError::EstimatorInternal); } diff --git a/crates/shared/src/request_sharing.rs b/crates/shared/src/request_sharing.rs index 7c0c161ed4..d36dc7f4ce 100644 --- a/crates/shared/src/request_sharing.rs +++ b/crates/shared/src/request_sharing.rs @@ -3,7 +3,10 @@ use { future::{BoxFuture, Shared, WeakShared}, FutureExt, }, - prometheus::IntCounterVec, + prometheus::{ + core::{AtomicU64, GenericGauge}, + IntCounterVec, + }, std::{ collections::HashMap, future::Future, @@ -53,7 +56,11 @@ where fn collect_garbage(cache: &Cache) { let mut cache = cache.lock().unwrap(); + let len_before = cache.len() as u64; cache.retain(|_request, weak| weak.upgrade().is_some()); + Metrics::get() + .request_sharing_cached_items + .sub(len_before - cache.len() as u64); } fn spawn_gc(cache: Cache) { @@ -82,32 +89,14 @@ where Fut: Future, Fut::Output: Clone, { - // Intentionally returns Shared instead of an opaque `impl Future` (or - // being an async fn) because this has some useful properties to the caller - // like being unpin and fused. - - /// Returns an existing in flight future for this request or uses the passed - /// in future as a new in flight future. - /// - /// Note that futures do nothing util polled so merely creating the response - /// future is not expensive. - pub fn shared(&self, request: Request, future: Fut) -> Shared { - self.shared_or_else(request, move |_| future) - } - /// Returns an existing in flight future or creates and uses a new future /// from the specified closure. - /// - /// This is similar to [`RequestSharing::shared`] but lazily creates the - /// future. This can be helpful when creating futures is non trivial - /// (such as cloning a large vector). pub fn shared_or_else(&self, request: Request, future: F) -> Shared where F: FnOnce(&Request) -> Fut, { let mut in_flight = self.in_flight.lock().unwrap(); - // collect garbage and find copy of existing request let existing = in_flight.get(&request).and_then(WeakShared::upgrade); if let Some(existing) = existing { @@ -127,6 +116,7 @@ where // unwrap because downgrade only returns None if the Shared has already // completed which cannot be the case because we haven't polled it yet. in_flight.insert(request, shared.downgrade().unwrap()); + Metrics::get().request_sharing_cached_items.inc(); shared } } @@ -136,6 +126,9 @@ struct Metrics { /// Request sharing hits & misses #[metric(labels("request_label", "result"))] request_sharing_access: IntCounterVec, + + /// Number of all currently cached requests + request_sharing_cached_items: GenericGauge, } impl Metrics { @@ -158,12 +151,14 @@ mod tests { request_label: Default::default(), }; - let shared0 = sharing.shared(0, futures::future::ready(0).boxed()); - let shared1 = sharing.shared(0, async { panic!() }.boxed()); - // Would use Arc::ptr_eq but Shared doesn't implement it. + let shared0 = sharing.shared_or_else(0, |_| futures::future::ready(0).boxed()); + let shared1 = sharing.shared_or_else(0, |_| async { panic!() }.boxed()); + + assert!(shared0.ptr_eq(&shared1)); assert_eq!(shared0.strong_count().unwrap(), 2); assert_eq!(shared1.strong_count().unwrap(), 2); assert_eq!(shared0.weak_count().unwrap(), 1); + // complete first shared assert_eq!(shared0.now_or_never().unwrap(), 0); assert_eq!(shared1.strong_count().unwrap(), 1); diff --git a/crates/shared/src/trade_finding/external.rs b/crates/shared/src/trade_finding/external.rs index 4694095def..e285b7cb7f 100644 --- a/crates/shared/src/trade_finding/external.rs +++ b/crates/shared/src/trade_finding/external.rs @@ -10,7 +10,6 @@ use { ethrpc::current_block::CurrentBlockStream, futures::{future::BoxFuture, FutureExt}, reqwest::{header, Client}, - std::sync::Arc, url::Url, }; @@ -30,7 +29,7 @@ pub struct ExternalTradeFinder { block_stream: CurrentBlockStream, /// Timeout of the quote request to the driver. - timeout: Arc, + timeout: std::time::Duration, } impl ExternalTradeFinder { @@ -45,67 +44,70 @@ impl ExternalTradeFinder { sharing: RequestSharing::labelled(format!("tradefinder_{}", driver)), client, block_stream, - timeout: Arc::new(timeout), + timeout, } } /// Queries the `/quote` endpoint of the configured driver and deserializes /// the result into a Quote or Trade. async fn shared_query(&self, query: &Query) -> Result { - let deadline = chrono::Utc::now() + *self.timeout; - let order = dto::Order { - sell_token: query.sell_token, - buy_token: query.buy_token, - amount: query.in_amount.get(), - kind: query.kind, - deadline, - }; - - let mut request = self - .client - .get(self.quote_endpoint.clone()) - .query(&order) - .header(header::CONTENT_TYPE, "application/json") - .header(header::ACCEPT, "application/json"); - - if query.block_dependent { - request = request.header( - "X-Current-Block-Hash", - self.block_stream.borrow().hash.to_string(), - ) - } - - if let Some(id) = observe::request_id::get_task_local_storage() { - request = request.header("X-REQUEST-ID", id); - } - - let timeout = self.timeout.clone(); - let future = async move { - let response = request - .timeout(*timeout) - .send() - .await - .map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?; - if response.status() == 429 { - return Err(PriceEstimationError::RateLimited); + let fut = move |query: &Query| { + let order = dto::Order { + sell_token: query.sell_token, + buy_token: query.buy_token, + amount: query.in_amount.get(), + kind: query.kind, + deadline: chrono::Utc::now() + self.timeout, + }; + let block_dependent = query.block_dependent; + let id = observe::request_id::get_task_local_storage(); + let timeout = self.timeout; + let client = self.client.clone(); + let quote_endpoint = self.quote_endpoint.clone(); + let block_hash = self.block_stream.borrow().hash; + + async move { + let mut request = client + .get(quote_endpoint) + .query(&order) + .header(header::CONTENT_TYPE, "application/json") + .header(header::ACCEPT, "application/json"); + + if block_dependent { + request = request.header("X-Current-Block-Hash", block_hash.to_string()) + } + + if let Some(id) = id { + request = request.header("X-REQUEST-ID", id); + } + + let response = request + .timeout(timeout) + .send() + .await + .map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?; + if response.status() == 429 { + return Err(PriceEstimationError::RateLimited); + } + let text = response + .text() + .await + .map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?; + serde_json::from_str::(&text) + .map(Trade::from) + .map_err(|err| { + if let Ok(err) = serde_json::from_str::(&text) { + PriceEstimationError::from(err) + } else { + PriceEstimationError::EstimatorInternal(anyhow!(err)) + } + }) } - let text = response - .text() - .await - .map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?; - serde_json::from_str::(&text) - .map(Trade::from) - .map_err(|err| { - if let Ok(err) = serde_json::from_str::(&text) { - PriceEstimationError::from(err) - } else { - PriceEstimationError::EstimatorInternal(anyhow!(err)) - } - }) + .boxed() }; self.sharing - .shared(query.clone(), future.boxed()) + .shared_or_else(query.clone(), fut) .await .map_err(TradeError::from) } From 7cc377815402aa9a0458272a363d91e50c1b54b5 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Mon, 12 Aug 2024 18:30:54 +0200 Subject: [PATCH 4/6] Decrease count on drop (#2882) # Description Although the `RequestSharing` structs currently don't get deleted we should still update the caching metrics on `Drop` just to be safe. --- crates/shared/src/request_sharing.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/shared/src/request_sharing.rs b/crates/shared/src/request_sharing.rs index d36dc7f4ce..c2d1b38f60 100644 --- a/crates/shared/src/request_sharing.rs +++ b/crates/shared/src/request_sharing.rs @@ -73,6 +73,15 @@ where } } +impl Drop for RequestSharing { + fn drop(&mut self) { + let cache = self.in_flight.lock().unwrap(); + Metrics::get() + .request_sharing_cached_items + .sub(cache.len() as u64); + } +} + /// Returns a shallow copy (without any pending requests) impl Clone for RequestSharing { fn clone(&self) -> Self { From b156d2995b6ae30ba26998e237f93fd747ef1990 Mon Sep 17 00:00:00 2001 From: Mateo-mro <160488334+Mateo-mro@users.noreply.github.com> Date: Tue, 13 Aug 2024 08:49:35 +0200 Subject: [PATCH 5/6] Native prices: Implement generic buffered batching system 1/3 (#2869) # Description Implements a generic batching system for the native prices. The implementation has a blocking function `blocking_buffered_estimate_prices()` that belongs to a struct which runs a thread. The thread gathers the requests either by time (it waits X time to gather request and then fires them) or by number of requests (as soon ass it reaches a number of requests, fires them). This generic implementation allow for any native price fetcher to batch buffer the request as long as they implement the trait `NativePriceBatchFetcher`. This code is heavily based on the `ethrpc` crate code. In the upcoming PR: - the configuration will be propagated to the services configuration - implement the trait `NativePriceBatchFetcher` for CoinGecko # Changes - Generic buffered batching system for native prices ## How to test 1. Unit test --------- Co-authored-by: MartinquaXD --- crates/shared/src/price_estimation.rs | 1 + .../shared/src/price_estimation/buffered.rs | 563 ++++++++++++++++++ .../price_estimation/competition/native.rs | 12 +- 3 files changed, 570 insertions(+), 6 deletions(-) create mode 100644 crates/shared/src/price_estimation/buffered.rs diff --git a/crates/shared/src/price_estimation.rs b/crates/shared/src/price_estimation.rs index 650b4c1d7a..44b776e816 100644 --- a/crates/shared/src/price_estimation.rs +++ b/crates/shared/src/price_estimation.rs @@ -26,6 +26,7 @@ use { thiserror::Error, }; +mod buffered; pub mod competition; pub mod external; pub mod factory; diff --git a/crates/shared/src/price_estimation/buffered.rs b/crates/shared/src/price_estimation/buffered.rs new file mode 100644 index 0000000000..2cfbd30181 --- /dev/null +++ b/crates/shared/src/price_estimation/buffered.rs @@ -0,0 +1,563 @@ +//! A buffered implementation that automatically groups native prices API +//! requests into batches. + +use { + crate::price_estimation::{ + native::{NativePriceEstimateResult, NativePriceEstimating}, + PriceEstimationError, + }, + anyhow::anyhow, + futures::{ + channel::mpsc, + future::FutureExt as _, + stream::{self, FusedStream, Stream, StreamExt as _}, + }, + primitive_types::H160, + std::{ + collections::{HashMap, HashSet}, + future::Future, + num::NonZeroUsize, + sync::Arc, + time::Duration, + }, + tokio::{sync::broadcast, task::JoinHandle}, +}; + +/// Buffered configuration. +#[derive(Clone)] +#[allow(dead_code)] +pub struct Configuration { + /// The maximum amount of concurrent batches to request. + /// + /// Specifying `None` means no limit on concurrency. + pub max_concurrent_requests: Option, + /// The maximum batch size. + pub max_batch_len: usize, + /// An additional minimum delay to wait for collecting requests. + /// + /// The delay to start counting after receiving the first request. + pub debouncing_time: Duration, + /// The timeout to wait for the result to be ready + pub result_ready_timeout: Duration, + /// Maximum capacity of the broadcast channel to store the native prices + /// results + pub broadcast_channel_capacity: usize, +} + +/// Trait for fetching a batch of native price estimates. +#[allow(dead_code)] +#[cfg_attr(test, mockall::automock)] +pub trait NativePriceBatchFetching: Sync + Send + NativePriceEstimating { + /// Fetches a batch of native price estimates. + /// + /// It returns a HashMap which maps the token with its native price + /// estimator result + fn fetch_native_prices( + &self, + tokens: &HashSet, + ) -> futures::future::BoxFuture< + '_, + Result, PriceEstimationError>, + >; +} + +/// Buffered implementation that implements automatic batching of +/// native prices requests. +#[allow(dead_code)] +#[derive(Clone)] +pub struct BufferedRequest { + config: Configuration, + inner: Arc, + requests: mpsc::UnboundedSender, + results: broadcast::Sender, +} + +/// Object to map the token with its native price estimator result +#[allow(dead_code)] +#[derive(Clone)] +struct NativePriceResult { + token: H160, + result: Result, +} + +impl NativePriceEstimating for BufferedRequest +where + Inner: NativePriceBatchFetching + NativePriceEstimating + 'static, +{ + /// Request to get estimate prices in a batch + fn estimate_native_price( + &self, + token: H160, + ) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> { + async move { + // Sends the token for requesting price + self.requests.unbounded_send(token).map_err(|e| { + PriceEstimationError::ProtocolInternal(anyhow!( + "failed to append a new token to the queue: {e:?}" + )) + })?; + + let mut rx = self.results.subscribe(); + + tokio::time::timeout(self.config.result_ready_timeout, async { + loop { + match rx.recv().await { + Ok(value) => { + if value.token == token { + return value.result; + } + } + // Receiver lagged behind the result stream but the + // necessary response might still be in the stream. + Err(_) => continue, + } + } + }) + .await + .map_err(|_| { + PriceEstimationError::ProtocolInternal(anyhow!( + "blocking buffered estimate prices timeout elapsed" + )) + })? + } + .boxed() + } +} + +#[allow(dead_code)] +impl BufferedRequest +where + Inner: NativePriceBatchFetching + Send + Sync + NativePriceEstimating + 'static, +{ + /// Creates a new buffered transport with the specified configuration. + pub fn with_config(inner: Inner, config: Configuration) -> Self { + let inner = Arc::new(inner); + let (requests_sender, requests_receiver) = mpsc::unbounded(); + + let (results_sender, _) = broadcast::channel(config.broadcast_channel_capacity); + + Self::background_worker( + inner.clone(), + config.clone(), + requests_receiver, + results_sender.clone(), + ); + + Self { + inner, + requests: requests_sender, + results: results_sender, + config, + } + } + + /// Start a background worker for handling batched requests. + fn background_worker( + inner: Arc, + config: Configuration, + requests: mpsc::UnboundedReceiver, + results_sender: broadcast::Sender, + ) -> JoinHandle<()> { + tokio::task::spawn(batched_for_each(config, requests, move |batch| { + let inner = inner.clone(); + let results_sender = results_sender.clone(); + async move { + if batch.is_empty() { + return; + } + let batch = batch.into_iter().collect::>(); + let results: Vec<_> = match inner.fetch_native_prices(&batch).await { + Ok(results) => results + .into_iter() + .map(|(token, price)| NativePriceResult { + token, + result: price, + }) + .collect(), + Err(err) => { + tracing::error!(?err, "failed to send native price batch request"); + batch + .into_iter() + .map(|token| NativePriceResult { + token, + result: Err(err.clone()), + }) + .collect() + } + }; + for result in results { + let _ = results_sender.send(result); + } + } + })) + } +} + +/// Batches a stream into chunks. +/// +/// This is very similar to `futures::stream::StreamExt::ready_chunks` with the +/// difference that it allows configuring a minimum delay for a batch, so +/// waiting for a small amount of time to allow the stream to produce additional +/// items, thus decreasing the chance of batches of size 1. +fn batched_for_each( + config: Configuration, + items: St, + work: F, +) -> impl Future +where + St: Stream + FusedStream + Unpin, + F: Fn(Vec) -> Fut, + Fut: Future, +{ + let concurrency_limit = config.max_concurrent_requests.map(NonZeroUsize::get); + + let batches = stream::unfold(items, move |mut items| async move { + let mut chunk = vec![items.next().await?]; + + let delay = tokio::time::sleep(config.debouncing_time).fuse(); + futures::pin_mut!(delay); + + // Append new elements to the bulk until reaching either of the scenarios: + // - reach maximum number of elements per batch (`max_batch_len) + // - we reach the `debouncing_time` + while chunk.len() < config.max_batch_len { + futures::select_biased! { + item = items.next() => match item { + Some(item) => chunk.push(item), + None => break, + }, + _ = delay => break, + } + } + + Some((chunk, items)) + }); + + batches.for_each_concurrent(concurrency_limit, work) +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::price_estimation::native::MockNativePriceEstimating, + futures::future::try_join_all, + num::ToPrimitive, + tokio::time::sleep, + }; + + impl NativePriceEstimating for MockNativePriceBatchFetching { + fn estimate_native_price( + &self, + token: H160, + ) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> { + async move { + let prices = self.fetch_native_prices(&HashSet::from([token])).await?; + prices + .get(&token) + .cloned() + .ok_or(PriceEstimationError::NoLiquidity)? + } + .boxed() + } + } + + fn token(u: u64) -> H160 { + H160::from_low_u64_be(u) + } + + #[tokio::test] + async fn single_batch_request_successful_estimates() { + let mut inner = MockNativePriceEstimating::new(); + inner + .expect_estimate_native_price() + // Because it gets the value from the batch estimator, it does not need to do this call at all + .never(); + + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested just one, because for the second call it fetches the cached one + .times(1) + .returning(|input| { + let input_cloned = input.clone(); + async move { + Ok(input_cloned + .iter() + .map(|token| (*token, Ok::<_, PriceEstimationError>(1.0))) + .collect::>()) + }.boxed() + }); + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(1), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = BufferedRequest::with_config(native_price_batch_fetcher, config); + let result = buffered.estimate_native_price(token(0)).await; + assert_eq!(result.as_ref().unwrap().to_i64().unwrap(), 1); + } + + #[tokio::test] + async fn batching_successful_estimates() { + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested just one, because for the second call it fetches the cached one + .times(1) + .returning(|input| { + let input_cloned = input.clone(); + async move { Ok(input_cloned + .iter() + .map(|token| (*token, Ok::<_, PriceEstimationError>(1.0))) + .collect::>()) }.boxed() + }); + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(1), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = BufferedRequest::with_config(native_price_batch_fetcher, config); + + let result = buffered.estimate_native_price(token(0)).await; + + assert_eq!(result.as_ref().unwrap().to_i64().unwrap(), 1); + } + + #[tokio::test] + async fn batching_unsuccessful_estimates() { + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested just one + .times(1) + .returning(|_| { + async { Err(PriceEstimationError::NoLiquidity) }.boxed() + }); + + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(1), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = BufferedRequest::with_config(native_price_batch_fetcher, config); + + let result = buffered.estimate_native_price(token(0)).await; + + assert_eq!(result, Err(PriceEstimationError::NoLiquidity)); + } + + // Function to check batching of many tokens + async fn check_batching_many( + buffered: Arc>, + tokens_requested: usize, + ) { + let mut futures = Vec::with_capacity(tokens_requested); + for i in 0..tokens_requested { + let buffered = buffered.clone(); + futures.push(tokio::spawn(async move { + buffered + .estimate_native_price(token(i.try_into().unwrap())) + .await + })); + } + + let mut results = try_join_all(futures).await.expect( + "valid + futures", + ); + + while let Some(result) = results.pop() { + let result = result.unwrap(); + assert_eq!(result.to_i64().unwrap(), 1); + } + } + + #[tokio::test] + async fn batching_many_in_one_batch_successful_estimates() { + let tokens_requested = 20; + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested exactly one time because the max batch is 20, so all petitions fit into one batch request + .times(1) + .returning(|input| { + let input_cloned = input.clone(); + async move { Ok(input_cloned + .iter() + .map(|token| (*token, Ok::<_, PriceEstimationError>(1.0))) + .collect::>()) }.boxed() + }); + + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(1), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = Arc::new(BufferedRequest::with_config( + native_price_batch_fetcher, + config, + )); + + check_batching_many(buffered, tokens_requested).await; + } + + #[tokio::test] + async fn batching_many_in_one_batch_with_mixed_results_estimates() { + let tokens_requested = 2; + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested exactly one time because the max batch is 20, so all petitions fit into one batch request + .times(1) + .returning(|input| { + let input_cloned = input.clone(); + async move { Ok(input_cloned + .iter() + .enumerate() + .map(|(i, token)| + if i % 2 == 0 { + (*token, Ok::<_, PriceEstimationError>(1.0)) + } else { + (*token, Err(PriceEstimationError::NoLiquidity)) + } + ).collect::>()) }.boxed() + }); + + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(1), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = Arc::new(BufferedRequest::with_config( + native_price_batch_fetcher, + config, + )); + + let mut futures = Vec::with_capacity(tokens_requested); + for i in 0..tokens_requested { + let buffered = buffered.clone(); + futures.push(tokio::spawn(async move { + buffered + .estimate_native_price(token(i.try_into().unwrap())) + .await + })); + } + + let results = try_join_all(futures).await.expect( + "valid + futures", + ); + + // We got two results, one must be correct and the other with an error + assert_eq!(results.len(), 2); + assert!(results.contains(&Ok(1.0))); + assert!(results.contains(&Err(PriceEstimationError::NoLiquidity))); + } + + #[tokio::test] + async fn batching_many_in_two_batch_successful_estimates() { + let tokens_requested = 21; + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested exactly two times because the max batch is 20, so all petitions fit into one batch request + .times(2) + .returning(|input| { + let input_cloned = input.clone(); + async move { Ok(input_cloned + .iter() + .map(|token| (*token, Ok::<_, PriceEstimationError>(1.0))) + .collect::>()) }.boxed() + }); + + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(2), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = Arc::new(BufferedRequest::with_config( + native_price_batch_fetcher, + config, + )); + + check_batching_many(buffered, tokens_requested).await; + } + + #[tokio::test] + async fn batching_no_calls() { + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We are testing the native prices are never called + .never(); + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(2), + max_batch_len: 20, + debouncing_time: Duration::from_millis(50), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let _buffered = Arc::new(BufferedRequest::with_config( + native_price_batch_fetcher, + config, + )); + + sleep(Duration::from_millis(250)).await; + } + + #[tokio::test] + async fn batching_many_in_multiple_times_successful_estimates() { + let tokens_requested = 20; + let mut native_price_batch_fetcher = MockNativePriceBatchFetching::new(); + native_price_batch_fetcher + .expect_fetch_native_prices() + // We expect this to be requested exactly two times because there are two batches petitions separated by 250 ms + .times(2) + .returning(|input| { + let input_cloned = input.clone(); + async move { Ok(input_cloned + .iter() + .map(|token| (*token, Ok::<_, PriceEstimationError>(1.0))) + .collect::>()) }.boxed() + }); + + let config = Configuration { + max_concurrent_requests: NonZeroUsize::new(2), + max_batch_len: 20, + debouncing_time: Duration::from_millis(10), + result_ready_timeout: Duration::from_millis(500), + broadcast_channel_capacity: 50, + }; + + let buffered = Arc::new(BufferedRequest::with_config( + native_price_batch_fetcher, + config, + )); + + check_batching_many(buffered.clone(), tokens_requested).await; + + sleep(Duration::from_millis(20)).await; + + check_batching_many(buffered, tokens_requested).await; + } +} diff --git a/crates/shared/src/price_estimation/competition/native.rs b/crates/shared/src/price_estimation/competition/native.rs index b9baab978b..0600500fa1 100644 --- a/crates/shared/src/price_estimation/competition/native.rs +++ b/crates/shared/src/price_estimation/competition/native.rs @@ -1,17 +1,17 @@ use { super::{compare_error, CompetitionEstimator}, - crate::price_estimation::{native::NativePriceEstimating, PriceEstimationError}, - futures::future::{BoxFuture, FutureExt}, + crate::price_estimation::{ + native::{NativePriceEstimateResult, NativePriceEstimating}, + PriceEstimationError, + }, + futures::{future::BoxFuture, FutureExt}, model::order::OrderKind, primitive_types::H160, std::{cmp::Ordering, sync::Arc}, }; impl NativePriceEstimating for CompetitionEstimator> { - fn estimate_native_price( - &self, - token: H160, - ) -> BoxFuture<'_, Result> { + fn estimate_native_price(&self, token: H160) -> BoxFuture<'_, NativePriceEstimateResult> { async move { let results = self .produce_results(token, Result::is_ok, |e, q| e.estimate_native_price(q)) From 0412ffa6a9f4ece620b9b83b138058ae2053a658 Mon Sep 17 00:00:00 2001 From: Dusan Stanivukovic Date: Tue, 13 Aug 2024 11:51:44 +0200 Subject: [PATCH 6/6] Fix failing e2e test for cow amms (#2883) @MartinquaXD I believe you wrote the test, do you remember why this specific pool was added to the list? --- crates/autopilot/src/solvable_orders.rs | 15 +++++++++++---- crates/e2e/tests/e2e/cow_amm.rs | 3 ++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index 0e0741114e..949c42cbe7 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -253,10 +253,17 @@ impl SolvableOrdersCache { let surplus_capturing_jit_order_owners = cow_amms .iter() .filter(|cow_amm| { - cow_amm - .traded_tokens() - .iter() - .all(|token| prices.contains_key(token)) + cow_amm.traded_tokens().iter().all(|token| { + let price_exist = prices.contains_key(token); + if !price_exist { + tracing::debug!( + cow_amm = ?cow_amm.address(), + ?token, + "omitted from auction due to missing prices" + ); + } + price_exist + }) }) .map(|cow_amm| cow_amm.address()) .cloned() diff --git a/crates/e2e/tests/e2e/cow_amm.rs b/crates/e2e/tests/e2e/cow_amm.rs index a900454cef..b0f8078a5b 100644 --- a/crates/e2e/tests/e2e/cow_amm.rs +++ b/crates/e2e/tests/e2e/cow_amm.rs @@ -540,7 +540,8 @@ async fn cow_amm_driver_support(web3: Web3) { addr!("301076c36e034948a747bb61bab9cd03f62672e3"), addr!("d7cb8cc1b56356bb7b78d02e785ead28e2158660"), addr!("9941fd7db2003308e7ee17b04400012278f12ac6"), - addr!("beef5afe88ef73337e5070ab2855d37dbf5493a4"), + // no native prices for the tokens traded by this AMM (COW token price) + // addr!("beef5afe88ef73337e5070ab2855d37dbf5493a4"), addr!("c6b13d5e662fa0458f03995bcb824a1934aa895f"), ];