Skip to content

Commit

Permalink
Merge branch 'main' into add-block-no-mempool-executed-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz authored Jan 6, 2025
2 parents c68e67f + 4d82068 commit 5846dbb
Show file tree
Hide file tree
Showing 33 changed files with 322 additions and 221 deletions.
31 changes: 4 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ humantime-serde = "1.1.1"
hyper = "0.14.29"
indexmap = "2.2.6"
itertools = "0.12.1"
lazy_static = "1.4.0"
maplit = "1.0.2"
mockall = "0.12.1"
num = "0.4.3"
once_cell = "1.19.0"
primitive-types = "0.12"
prometheus = "0.13.4"
prometheus-metric-storage = "0.5.0"
Expand All @@ -46,7 +44,6 @@ serde_with = "3.8.1"
sqlx = { version = "0.7", default-features = false, features = ["runtime-tokio", "tls-native-tls", "bigdecimal", "chrono", "postgres", "macros"] }
strum = { version = "0.26.2", features = ["derive"] }
tempfile = "3.10.1"
time = { version = "0.3.36", features = ["macros"] }
thiserror = "1.0.61"
toml = "0.8.14"
tokio = { version = "1.38.0", features = ["tracing"] }
Expand Down
52 changes: 52 additions & 0 deletions crates/database/src/trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ AND t.log_index BETWEEN (SELECT * from previous_settlement) AND $2
.await
}

pub async fn token_first_trade_block(
ex: &mut PgConnection,
token: Address,
) -> Result<Option<i64>, sqlx::Error> {
const QUERY: &str = r#"
SELECT MIN(sub.block_number) AS earliest_block
FROM (
SELECT MIN(t.block_number) AS block_number
FROM trades t
JOIN orders o ON t.order_uid = o.uid
WHERE o.sell_token = $1 OR o.buy_token = $1
UNION ALL
SELECT MIN(t.block_number) AS block_number
FROM trades t
JOIN jit_orders j ON t.order_uid = j.uid
WHERE j.sell_token = $1 OR j.buy_token = $1
) AS sub
"#;

let (block_number,) = sqlx::query_as(QUERY).bind(token).fetch_one(ex).await?;
Ok(block_number)
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -579,4 +604,31 @@ mod tests {
}]
);
}

#[tokio::test]
#[ignore]
async fn postgres_token_first_trade_block() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

let token = Default::default();
assert_eq!(token_first_trade_block(&mut db, token).await.unwrap(), None);

let (owners, order_ids) = generate_owners_and_order_ids(2, 2).await;
let event_index_a = EventIndex {
block_number: 123,
log_index: 0,
};
let event_index_b = EventIndex {
block_number: 124,
log_index: 0,
};
add_order_and_trade(&mut db, owners[0], order_ids[0], event_index_a, None, None).await;
add_order_and_trade(&mut db, owners[1], order_ids[1], event_index_b, None, None).await;
assert_eq!(
token_first_trade_block(&mut db, token).await.unwrap(),
Some(123)
);
}
}
1 change: 0 additions & 1 deletion crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ hex-literal = { workspace = true }
humantime = { workspace = true }
humantime-serde = { workspace = true }
hyper = { workspace = true }
lazy_static = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
itertools = { workspace = true }
mimalloc = { workspace = true }
Expand Down
28 changes: 16 additions & 12 deletions crates/driver/src/domain/competition/bad_tokens/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct CacheEntry {
/// when the decision on the token quality was made
last_updated: Instant,
/// whether the token is supported or not
quality: Quality,
is_supported: bool,
}

impl Cache {
Expand All @@ -39,23 +39,21 @@ impl Cache {
}

/// Updates whether or not a token should be considered supported.
pub fn update_quality(&self, token: eth::TokenAddress, quality: Quality, now: Instant) {
pub fn update_quality(&self, token: eth::TokenAddress, is_supported: bool, now: Instant) {
self.0
.cache
.entry(token)
.and_modify(|value| {
if quality == Quality::Unsupported
|| now.duration_since(value.last_updated) > self.0.max_age
{
.and_modify(|token| {
if !is_supported || now.duration_since(token.last_updated) > self.0.max_age {
// Only update the value if the cached value is outdated by now or
// if the new value is "Unsupported". This means on conflicting updates
// we err on the conservative side and assume a token is unsupported.
value.quality = quality;
token.is_supported = is_supported;
}
value.last_updated = now;
token.last_updated = now;
})
.or_insert_with(|| CacheEntry {
quality,
is_supported,
last_updated: now,
});
}
Expand All @@ -69,9 +67,15 @@ impl Cache {

/// Returns the quality of the token if the cached value has not expired
/// yet.
pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Option<Quality> {
let token = self.0.cache.get(token)?;
pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Quality {
let Some(token) = self.0.cache.get(token) else {
return Quality::Unknown;
};
let still_valid = now.duration_since(token.last_updated) > self.0.max_age;
still_valid.then_some(token.quality)
match (still_valid, token.is_supported) {
(false, _) => Quality::Unknown,
(true, true) => Quality::Supported,
(true, false) => Quality::Unsupported,
}
}
}
56 changes: 30 additions & 26 deletions crates/driver/src/domain/competition/bad_tokens/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,36 @@ impl Detector {
}
}

pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Option<Quality> {
let stats = self.counter.get(token)?;
pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Quality {
let Some(stats) = self.counter.get(token) else {
return Quality::Unknown;
};

if stats
.flagged_unsupported_at
.is_some_and(|t| now.duration_since(t) > self.token_freeze_time)
{
// Sometimes tokens only cause issues temporarily. If the token's freeze
// period expired we give it another chance to see if it still behaves badly.
return None;
// period expired we pretend we don't have enough information to give it
// another chance. If it still behaves badly it will get frozen immediately.
return Quality::Unknown;
}

let is_unsupported = self.stats_indicate_unsupported(&stats);
(!self.log_only && is_unsupported).then_some(Quality::Unsupported)
match self.log_only {
true => Quality::Supported,
false => self.quality_based_on_stats(&stats),
}
}

fn stats_indicate_unsupported(&self, stats: &TokenStatistics) -> bool {
let token_failure_ratio = match stats.attempts {
0 => return false,
attempts => f64::from(stats.fails) / f64::from(attempts),
};
stats.attempts >= self.required_measurements && token_failure_ratio >= self.failure_ratio
fn quality_based_on_stats(&self, stats: &TokenStatistics) -> Quality {
if stats.attempts < self.required_measurements {
return Quality::Unknown;
}
let token_failure_ratio = f64::from(stats.fails) / f64::from(stats.attempts);
match token_failure_ratio >= self.failure_ratio {
true => Quality::Unsupported,
false => Quality::Supported,
}
}

/// Updates the tokens that participated in settlements by
Expand Down Expand Up @@ -96,7 +105,7 @@ impl Detector {
});

// token neeeds to be frozen as unsupported for a while
if self.stats_indicate_unsupported(&stats)
if self.quality_based_on_stats(&stats) == Quality::Unsupported
&& stats
.flagged_unsupported_at
.is_none_or(|t| now.duration_since(t) > self.token_freeze_time)
Expand Down Expand Up @@ -128,28 +137,23 @@ mod tests {

let token_a = eth::TokenAddress(eth::ContractAddress(H160([1; 20])));
let token_b = eth::TokenAddress(eth::ContractAddress(H160([2; 20])));
let token_quality = || detector.get_quality(&token_a, Instant::now());

// token is reported as supported while we don't have enough measurements
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
// token is reported as unknown while we don't have enough measurements
assert_eq!(token_quality(), Quality::Unknown);
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
assert_eq!(token_quality(), Quality::Unknown);
detector.update_tokens(&[(token_a, token_b)], true);

// after we got enough measurements the token gets marked as bad
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);
assert_eq!(token_quality(), Quality::Unsupported);

// after the freeze period is over the token gets reported as good again
// after the freeze period is over the token gets reported as unknown again
tokio::time::sleep(FREEZE_DURATION).await;
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
assert_eq!(token_quality(), Quality::Unknown);

// after an unfreeze another bad measurement is enough to freeze it again
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);
assert_eq!(token_quality(), Quality::Unsupported);
}
}
Loading

0 comments on commit 5846dbb

Please sign in to comment.