Skip to content

Commit

Permalink
[Driver] Allow stale liquidity for quote requests (#1924)
Browse files Browse the repository at this point in the history
# Description
When giving production traffic to the colocated setup, we noticed a lot
of price estimators (even fast ones like baseline) getting rate limited
frequently. This is due to the driver requesting liquidity for the most
recent block and blocking on fetching it if it's not available. For the
legacy setup, there was a code path specifically for quotes, allowing to
use a "recent" instead of the latest block for fetching liquidity.

This PR recreates this path for the co-located setup

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- Adds a flag to the liquidity fetcher indicating whether stale
liquidity is allowed
- If this flag is set, quote for _recent_ instead of _latest_ block
(which should already be cached)

## How to test
Run this benchmark script against before and after:

```sh
SECONDS=0 
while (( SECONDS < 60 )); do
  time curl -H 'content-type: application/json' --data '{"from": "0xc3792470cee7e0d42c2be8e9552bd651766c5178","buyToken": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48","sellToken": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","kind": "sell","sellAmountAfterFee": "100000000000000000", "quality":"optimal"}' http://localhost:8080/api/v1/quote
done
```

observe http://localhost:11088/metrics in both cases and look for
liquidity cache hits:

Before:
```
driver_recent_block_cache_hits{cache_type="uniswapv2"} 34300
driver_recent_block_cache_misses{cache_type="uniswapv2"} 840
```

After
```
driver_recent_block_cache_hits{cache_type="uniswapv2"} 39200
driver_recent_block_cache_misses{cache_type="uniswapv2"} 140
```

Note that we now only have 1 cache miss (140 pools) on cold start vs 1
cache miss on each new block and higher overall throughput

## Related issues
#1672

---------

Co-authored-by: Nicholas Rodrigues Lordello <[email protected]>
  • Loading branch information
fleupold and Nicholas Rodrigues Lordello authored Oct 9, 2023
1 parent 9c21e7e commit 5be8e8f
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 10 deletions.
14 changes: 9 additions & 5 deletions crates/driver/src/boundary/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl Fetcher {
pub async fn fetch(
&self,
pairs: &HashSet<liquidity::TokenPair>,
block: infra::liquidity::AtBlock,
) -> Result<Vec<liquidity::Liquidity>> {
let pairs = pairs
.iter()
Expand All @@ -137,12 +138,15 @@ impl Fetcher {
TokenPair::new(a.into(), b.into()).expect("a != b")
})
.collect();
let block_number = self.blocks.borrow().number;

let liquidity = self
.inner
.get_liquidity(pairs, recent_block_cache::Block::Number(block_number))
.await?;
let block = match block {
infra::liquidity::AtBlock::Recent => recent_block_cache::Block::Recent,
infra::liquidity::AtBlock::Latest => {
let block_number = self.blocks.borrow().number;
recent_block_cache::Block::Number(block_number)
}
};
let liquidity = self.inner.get_liquidity(pairs, block).await?;

let liquidity = liquidity
.into_iter()
Expand Down
9 changes: 8 additions & 1 deletion crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ impl Competition {
/// Solve an auction as part of this competition.
pub async fn solve(&self, auction: &Auction) -> Result<Solved, Error> {
let liquidity = match self.solver.liquidity() {
solver::Liquidity::Fetch => self.liquidity.fetch(&auction.liquidity_pairs()).await,
solver::Liquidity::Fetch => {
self.liquidity
.fetch(
&auction.liquidity_pairs(),
infra::liquidity::AtBlock::Latest,
)
.await
}
solver::Liquidity::Skip => Default::default(),
};

Expand Down
6 changes: 5 additions & 1 deletion crates/driver/src/domain/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl Order {
tokens: &infra::tokens::Fetcher,
) -> Result<Quote, Error> {
let liquidity = match solver.liquidity() {
solver::Liquidity::Fetch => liquidity.fetch(&self.liquidity_pairs()).await,
solver::Liquidity::Fetch => {
liquidity
.fetch(&self.liquidity_pairs(), infra::liquidity::AtBlock::Recent)
.await
}
solver::Liquidity::Skip => Default::default(),
};
let timeout = self.deadline.timeout()?;
Expand Down
24 changes: 22 additions & 2 deletions crates/driver/src/infra/liquidity/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ pub struct Fetcher {
inner: Arc<boundary::liquidity::Fetcher>,
}

/// Specifies at which block liquidity should be fetched.
pub enum AtBlock {
/// Fetches liquidity at a recent block. This will prefer reusing cached
/// liquidity even if it is stale by a few blocks instead of fetching the
/// absolute latest state from the blockchain.
///
/// This is useful for quoting where we want an up-to-date, but not
/// necessarily exactly correct price. In the context of quote verification,
/// this is completely fine as the exactly input and output amounts will be
/// computed anyway. At worse, we might provide a slightly sub-optimal
/// route in some cases, but this is an acceptable trade-off.
Recent,
/// Fetches liquidity liquidity for the latest state of the blockchain.
Latest,
}

impl Fetcher {
/// Creates a new liquidity fetcher for the specified Ethereum instance and
/// configuration.
Expand All @@ -25,9 +41,13 @@ impl Fetcher {

/// Fetches all relevant liquidity for the specified token pairs. Handles
/// failures by logging and returning an empty vector.
pub async fn fetch(&self, pairs: &HashSet<liquidity::TokenPair>) -> Vec<liquidity::Liquidity> {
pub async fn fetch(
&self,
pairs: &HashSet<liquidity::TokenPair>,
block: AtBlock,
) -> Vec<liquidity::Liquidity> {
observe::fetching_liquidity();
match self.inner.fetch(pairs).await {
match self.inner.fetch(pairs, block).await {
Ok(liquidity) => {
observe::fetched_liquidity(&liquidity);
liquidity
Expand Down
5 changes: 4 additions & 1 deletion crates/driver/src/infra/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@
pub mod config;
pub mod fetcher;

pub use self::{config::Config, fetcher::Fetcher};
pub use self::{
config::Config,
fetcher::{AtBlock, Fetcher},
};
41 changes: 41 additions & 0 deletions crates/e2e/src/setup/onchain_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
order::Hook,
signature::{EcdsaSignature, EcdsaSigningScheme},
DomainSeparator,
TokenPair,
},
secp256k1::SecretKey,
shared::ethrpc::Web3,
Expand Down Expand Up @@ -505,6 +506,46 @@ impl OnchainComponents {
tokens
}

/// Mints `amount` tokens to its `token`-WETH Uniswap V2 pool.
///
/// This can be used to modify the pool reserves during a test.
pub async fn mint_token_to_weth_uni_v2_pool(&self, token: &MintableToken, amount: U256) {
let pair = contracts::IUniswapLikePair::at(
&self.web3,
self.contracts
.uniswap_v2_factory
.get_pair(self.contracts.weth.address(), token.address())
.call()
.await
.expect("failed to get Uniswap V2 pair"),
);
assert!(!pair.address().is_zero(), "Uniswap V2 pair is not deployed");

// Mint amount + 1 to the pool, and then swap out 1 of the minted token
// in order to force it to update its K-value.
token.mint(pair.address(), amount + 1).await;
let (out0, out1) = if TokenPair::new(self.contracts.weth.address(), token.address())
.unwrap()
.get()
.0
== token.address()
{
(1, 0)
} else {
(0, 1)
};
pair.swap(
out0.into(),
out1.into(),
token.minter.address(),
Default::default(),
)
.from(token.minter.clone())
.send()
.await
.expect("Uniswap V2 pair couldn't mint");
}

pub async fn deploy_cow_token(&self, holder: Account, supply: U256) -> CowToken {
let contract =
CowProtocolToken::builder(&self.web3, holder.address(), holder.address(), supply)
Expand Down
85 changes: 85 additions & 0 deletions crates/e2e/tests/e2e/colocation_quoting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use {
e2e::{setup::*, tx, tx_value},
ethcontract::U256,
model::quote::{OrderQuoteRequest, OrderQuoteSide, SellAmount},
number::nonzero::U256 as NonZeroU256,
shared::ethrpc::Web3,
};

#[tokio::test]
#[ignore]
async fn local_node_uses_stale_liquidity() {
run_test(uses_stale_liquidity).await;
}

async fn uses_stale_liquidity(web3: Web3) {
tracing::info!("Setting up chain state.");
let mut onchain = OnchainComponents::deploy(web3.clone()).await;

let [solver] = onchain.make_solvers(to_wei(10)).await;
let [trader] = onchain.make_accounts(to_wei(2)).await;
let [token] = onchain
.deploy_tokens_with_weth_uni_v2_pools(to_wei(1_000), to_wei(1_000))
.await;

tx!(
trader.account(),
onchain
.contracts()
.weth
.approve(onchain.contracts().allowance, to_wei(1))
);
tx_value!(
trader.account(),
to_wei(1),
onchain.contracts().weth.deposit()
);

tracing::info!("Starting services.");
let solver_endpoint = colocation::start_solver(onchain.contracts().weth.address()).await;
colocation::start_driver(onchain.contracts(), &solver_endpoint, &solver);

let services = Services::new(onchain.contracts()).await;
services.start_autopilot(vec![
"--enable-colocation=true".to_string(),
"--drivers=http://localhost:11088/test_solver".to_string(),
]);
services
.start_api(vec![
"--price-estimation-drivers=solver|http://localhost:11088/test_solver".to_string(),
])
.await;

let quote = OrderQuoteRequest {
from: trader.address(),
sell_token: onchain.contracts().weth.address(),
buy_token: token.address(),
side: OrderQuoteSide::Sell {
sell_amount: SellAmount::AfterFee {
value: NonZeroU256::new(to_wei(1)).unwrap(),
},
},
..Default::default()
};

tracing::info!("performining initial quote");
let first = services.submit_quote(&quote).await.unwrap();

// Now, we want to manually unbalance the pools and assert that the quote
// doesn't change (as the price estimation will use stale pricing data).
onchain
.mint_token_to_weth_uni_v2_pool(&token, to_wei(1_000))
.await;

tracing::info!("performining second quote, which should match first");
let second = services.submit_quote(&quote).await.unwrap();
assert_eq!(first.quote.buy_amount, second.quote.buy_amount);

tracing::info!("waiting for liquidity state to update");
wait_for_condition(TIMEOUT, || async {
let next = services.submit_quote(&quote).await.unwrap();
next.quote.buy_amount != first.quote.buy_amount
})
.await
.unwrap();
}
1 change: 1 addition & 0 deletions crates/e2e/tests/e2e/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod app_data;
mod colocation_ethflow;
mod colocation_hooks;
mod colocation_partial_fill;
mod colocation_quoting;
mod colocation_univ2;
mod database;
mod eth_integration;
Expand Down

0 comments on commit 5be8e8f

Please sign in to comment.