Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
m-lord-renkse committed Dec 12, 2024
1 parent 5e5241a commit d445982
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 59 deletions.
39 changes: 30 additions & 9 deletions crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
super::{bad_tokens, order, Order},
super::{order, Order},
crate::{
domain::{
competition::{self, auction, sorting},
Expand All @@ -14,12 +14,11 @@ use {
futures::future::{join_all, BoxFuture, FutureExt, Shared},
itertools::Itertools,
model::{order::OrderKind, signature::Signature},
shared::{
bad_token::trace_call::TraceCallDetectorRaw,
signature_validator::{Contracts, SignatureValidating},
},
shared::signature_validator::{Contracts, SignatureValidating},
std::{
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::{Arc, Mutex},
},
thiserror::Error,
Expand Down Expand Up @@ -76,6 +75,32 @@ impl Auction {
})
}

/// Filter the orders according to the funcion `filter_fn` provided.
/// The function `filter_fn` must return an `Option<Order>`, with `None`
/// indicating that the order has to be filtered.
/// This is needed due to the lack of `filter()` async closure support.
pub async fn filter_orders<F>(&mut self, filter_fn: F)
where
F: Fn(
competition::Order,
) -> Pin<Box<dyn Future<Output = Option<competition::Order>> + Send>>
+ Send,
{
let futures = self
.orders
.drain(..)
.map(|order| {
let filter_fn = &filter_fn;
async move { filter_fn(order).await }
})
.collect::<Vec<_>>();
self.orders = futures::future::join_all(futures)
.await
.into_iter()
.flatten()
.collect();
}

/// [`None`] if this auction applies to a quote. See
/// [`crate::domain::quote`].
pub fn id(&self) -> Option<Id> {
Expand Down Expand Up @@ -482,10 +507,6 @@ impl AuctionProcessor {
Self(Arc::new(Mutex::new(Inner {
auction: Id(0),
fut: futures::future::pending().boxed().shared(),
bad_token_detector: TraceCallDetectorRaw::new(
eth.web3().clone(),
eth.contracts().settlement().address(),
),
eth,
order_sorting_strategies,
signature_validator,
Expand Down
105 changes: 76 additions & 29 deletions crates/driver/src/domain/competition/bad_tokens.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
super::Order,
crate::{
domain::{self, eth},
infra,
domain::{competition::Auction, eth},
infra::{self, config::file::BadTokenDetectionCache},
},
anyhow::Result,
dashmap::{DashMap, Entry, OccupiedEntry, VacantEntry},
dashmap::{DashMap, Entry},
futures::FutureExt,
model::interaction::InteractionData,
shared::bad_token::{trace_call::TraceCallDetectorRaw, TokenQuality},
std::{
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct Detector {
hardcoded: HashMap<eth::TokenAddress, Quality>,
/// cache which is shared and updated by multiple bad token detection
/// mechanisms
cache: Cache,
cache: Arc<Cache>,
simulation_detector: Option<TraceCallDetectorRaw>,
metrics: Option<Metrics>,
}
Expand All @@ -65,22 +65,73 @@ impl Detector {
self
}

pub fn filter_unsupported_orders(&self, mut orders: Vec<Order>) -> Vec<Order> {
pub fn with_cache(mut self, cache: Arc<Cache>) -> Self {
self.cache = cache;
self
}

/// Filter all unsupported orders within an Auction
pub async fn filter_unsupported_orders_in_auction(
self: Arc<Self>,
mut auction: Auction,
) -> Auction {
let now = Instant::now();

// group by sell tokens?
// future calling `determine_sell_token_quality()` for all of orders
let self_clone = self.clone();

auction
.filter_orders(move |order| {
{
let self_clone = self_clone.clone();
async move {
// We first check the token quality:
// - If both tokens are supported, the order does is not filtered
// - If any of the order tokens is unsupported, the order is filtered
// - If the token quality cannot be determined: call
// `determine_sell_token_quality()` to execute the simulation
// All of these operations are done within the same `.map()` in order to
// avoid iterating twice over the orders vector
let tokens_quality = [order.sell.token, order.buy.token]
.iter()
.map(|token| self_clone.get_token_quality(*token, now))
.collect::<Vec<_>>();
let both_tokens_supported = tokens_quality
.iter()
.all(|token_quality| *token_quality == Some(Quality::Supported));
let any_token_unsupported = tokens_quality
.iter()
.any(|token_quality| *token_quality == Some(Quality::Unsupported));

// @TODO: remove the bad tokens from the tokens field?

// If both tokens are supported, the order does is not filtered
if both_tokens_supported {
return Some(order);
}

orders.retain(|o| {
[o.sell.token, o.buy.token].iter().all(|token| {
self.get_token_quality(*token, now)
.is_none_or(|q| q == Quality::Supported)
// If any of the order tokens is unsupported, the order is filtered
if any_token_unsupported {
return None;
}

// If the token quality cannot be determined: call
// `determine_sell_token_quality()` to execute the simulation
if self_clone.determine_sell_token_quality(&order, now).await
== Some(Quality::Supported)
{
return Some(order);
}

None
}
}
.boxed()
})
});
.await;

self.cache.evict_outdated_entries();

orders
auction
}

fn get_token_quality(&self, token: eth::TokenAddress, now: Instant) -> Option<Quality> {
Expand All @@ -99,12 +150,11 @@ impl Detector {
None
}

pub async fn determine_sell_token_quality(
&self,
detector: &TraceCallDetectorRaw,
order: &Order,
now: Instant,
) -> Option<Quality> {
async fn determine_sell_token_quality(&self, order: &Order, now: Instant) -> Option<Quality> {
let Some(detector) = self.simulation_detector.as_ref() else {
return None;
};

if let Some(quality) = self.cache.get_quality(order.sell.token, now) {
return Some(quality);
}
Expand All @@ -122,7 +172,7 @@ impl Detector {

match detector
.test_transfer(
order.trader().0 .0,
eth::Address::from(order.trader()).0,
token.0 .0,
order.sell.amount.0,
&pre_interactions,
Expand Down Expand Up @@ -164,8 +214,6 @@ pub struct Cache {
cache: DashMap<eth::TokenAddress, CacheEntry>,
/// entries older than this get ignored and evicted
max_age: Duration,
/// evicts entries when the cache grows beyond this size
max_size: usize,
}

struct CacheEntry {
Expand All @@ -177,18 +225,17 @@ struct CacheEntry {

impl Default for Cache {
fn default() -> Self {
Self::new(Duration::from_secs(60 * 10), 1000)
Self::new(&BadTokenDetectionCache::default())
}
}

impl Cache {
/// Creates a new instance which evicts cached values after a period of
/// time.
pub fn new(max_age: Duration, max_size: usize) -> Self {
pub fn new(bad_token_detection_cache: &BadTokenDetectionCache) -> Self {
Self {
max_age,
max_size,
cache: Default::default(),
max_age: bad_token_detection_cache.max_age,
cache: DashMap::with_capacity(bad_token_detection_cache.max_size),
}
}

Expand Down Expand Up @@ -243,7 +290,7 @@ impl Cache {
struct Metrics {}

impl Metrics {
fn get_quality(&self, token: eth::TokenAddress) -> Option<Quality> {
fn get_quality(&self, _token: eth::TokenAddress) -> Option<Quality> {
todo!()
}
}
19 changes: 13 additions & 6 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
std::{
cmp::Reverse,
collections::{HashMap, HashSet, VecDeque},
sync::Mutex,
sync::{Arc, Mutex},
},
tap::TapFallible,
};
Expand Down Expand Up @@ -53,15 +53,22 @@ pub struct Competition {
pub mempools: Mempools,
/// Cached solutions with the most recent solutions at the front.
pub settlements: Mutex<VecDeque<Settlement>>,
// TODO: single type should have the feature set to simulate
pub bad_tokens: bad_tokens::Detector,
pub bad_tokens: Option<Arc<bad_tokens::Detector>>,
}

impl Competition {
/// Solve an auction as part of this competition.
pub async fn solve(&self, auction: &Auction) -> Result<Option<Solved>, Error> {
// 1. simulate sell tokens
// 2. filter bad tokens
pub async fn solve(&self, mut auction: Auction) -> Result<Option<Solved>, Error> {
// filter orders in auction which contain a bad tokens if the bad token
// detection is configured
if let Some(bad_tokens) = self.bad_tokens.as_ref() {
auction = bad_tokens
.clone()
.filter_unsupported_orders_in_auction(auction)
.await;
}
// Enforces Auction not to be consumed by making it as a shared reference
let auction = &auction;

let liquidity = match self.solver.liquidity() {
solver::Liquidity::Fetch => {
Expand Down
4 changes: 2 additions & 2 deletions crates/driver/src/domain/competition/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ impl From<BuyTokenDestination> for BuyTokenBalance {
}

/// The address which placed the order.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Into)]
pub struct Trader(pub eth::Address);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Into, From)]
pub struct Trader(eth::Address);

/// A just-in-time order. JIT orders are added at solving time by the solver to
/// generate a more optimal solution for the auction. Very similar to a regular
Expand Down
46 changes: 38 additions & 8 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use {
crate::{
domain::{self, competition::bad_tokens, Mempools},
domain::{
self,
competition::{
bad_tokens,
bad_tokens::{Cache, Quality},
},
eth,
Mempools,
},
infra::{
self,
config::file::OrderPriorityStrategy,
config::file::{BadTokenDetectionCache, OrderPriorityStrategy},
liquidity,
solver::{Solver, Timeouts},
tokens,
Expand All @@ -13,7 +21,7 @@ use {
},
error::Error,
futures::Future,
std::{net::SocketAddr, sync::Arc},
std::{collections::HashMap, net::SocketAddr, sync::Arc},
tokio::sync::oneshot,
};

Expand All @@ -32,6 +40,7 @@ pub struct Api {
/// If this channel is specified, the bound address will be sent to it. This
/// allows the driver to bind to 0.0.0.0:0 during testing.
pub addr_sender: Option<oneshot::Sender<SocketAddr>>,
pub bad_token_detection_cache: BadTokenDetectionCache,
}

impl Api {
Expand All @@ -52,8 +61,9 @@ impl Api {
let tokens = tokens::Fetcher::new(&self.eth);
let pre_processor =
domain::competition::AuctionProcessor::new(&self.eth, order_priority_strategies);
let trace_detector = bad_tokens::SimulationDetector::new(&self.eth);
let miep = bad_tokens::Detector::default().register_cache(trace_detector.cache().clone());

// TODO: create a struct wrapper to handle this under the hood
let trace_detector = Arc::new(Cache::new(&self.bad_token_detection_cache));

// Add the metrics and healthz endpoints.
app = routes::metrics(app);
Expand All @@ -72,8 +82,28 @@ impl Api {
let router = routes::reveal(router);
let router = routes::settle(router);

let miep =
bad_tokens::Detector::default().register_cache(trace_detector.cache().clone());
let bad_tokens = solver.bad_token_detector().and_then(|bad_token_detector| {
// maybe make this as part of the bad token builder?
let config = bad_token_detector
.unsupported_tokens
.iter()
.map(|token| (eth::TokenAddress::from(*token), Quality::Unsupported))
.chain(
bad_token_detector
.allowed_tokens
.iter()
.map(|token| (eth::TokenAddress::from(*token), Quality::Supported)),
)
.collect::<HashMap<_, _>>();

Some(Arc::new(
// maybe do proper builder pattern here?
bad_tokens::Detector::default()
.with_simulation_detector(&self.eth.clone())
.with_config(config)
.with_cache(trace_detector.clone()),
))
});

let router = router.with_state(State(Arc::new(Inner {
eth: self.eth.clone(),
Expand All @@ -85,7 +115,7 @@ impl Api {
simulator: self.simulator.clone(),
mempools: self.mempools.clone(),
settlements: Default::default(),
bad_tokens: miep,
bad_tokens,
},
liquidity: self.liquidity.clone(),
tokens: tokens.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/routes/solve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn route(
.pre_processor()
.prioritize(auction, &competition.solver.account().address())
.await;
let result = competition.solve(&auction).await;
let result = competition.solve(auction).await;
observe::solved(state.solver().name(), &result);
Ok(axum::Json(dto::Solved::new(result?, &competition.solver)))
};
Expand Down
Loading

0 comments on commit d445982

Please sign in to comment.