From 0352160950e194cfcd1ee5d0bf213f0074e02d6b Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 3 Jul 2024 14:07:47 +0100 Subject: [PATCH 1/2] Add a vec of trait datasources to historical data fetcher --- Cargo.lock | 5 +- crates/rbuilder/Cargo.toml | 1 + .../rbuilder/src/backtest/fetch/datasource.rs | 32 ++++++ .../src/backtest/fetch/flashbots_db.rs | 44 ++++++- crates/rbuilder/src/backtest/fetch/mempool.rs | 70 +++++++++--- crates/rbuilder/src/backtest/fetch/mod.rs | 108 +++++------------- 6 files changed, 167 insertions(+), 93 deletions(-) create mode 100644 crates/rbuilder/src/backtest/fetch/datasource.rs diff --git a/Cargo.lock b/Cargo.lock index b46bf12b..d38807c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,9 +1008,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.78" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", @@ -7619,6 +7619,7 @@ dependencies = [ "alloy-serde 0.1.2", "alloy-transport", "alloy-transport-http", + "async-trait", "atoi", "bigdecimal 0.4.3", "built", diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index 8be526ca..595a8324 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -100,6 +100,7 @@ governor = "0.6.3" derivative = "2.2.0" mockall = "0.12.1" shellexpand = "3.1.0" +async-trait = "0.1.80" [build-dependencies] built = { version = "0.7.1", features = ["git2", "chrono"] } diff --git a/crates/rbuilder/src/backtest/fetch/datasource.rs b/crates/rbuilder/src/backtest/fetch/datasource.rs new file mode 100644 index 00000000..605668d3 --- /dev/null +++ b/crates/rbuilder/src/backtest/fetch/datasource.rs @@ -0,0 +1,32 @@ +use crate::backtest::OrdersWithTimestamp; +use async_trait::async_trait; + +/// Datasource trait +/// +/// This trait is used to fetch data from a datasource +#[async_trait] +pub trait Datasource: std::fmt::Debug { + async fn get_data(&self, block: BlockRef) -> eyre::Result>; + fn clone_box(&self) -> Box; +} + +impl Clone for Box { + fn clone(&self) -> Self { + self.clone_box() + } +} + +#[derive(Debug, Copy, Clone)] +pub struct BlockRef { + pub block_number: u64, + pub block_timestamp: u64, +} + +impl BlockRef { + pub fn new(block_number: u64, block_timestamp: u64) -> Self { + Self { + block_number, + block_timestamp, + } + } +} diff --git a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs index 64dff95d..77d01e95 100644 --- a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs +++ b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs @@ -1,10 +1,14 @@ use crate::{ - backtest::OrdersWithTimestamp, + backtest::{ + fetch::datasource::{BlockRef, Datasource}, + OrdersWithTimestamp, + }, primitives::{ serialize::{RawBundle, RawOrder, RawShareBundle, TxEncoding}, Order, SimValue, }, }; +use async_trait::async_trait; use bigdecimal::{ num_bigint::{BigInt, Sign, ToBigInt}, BigDecimal, @@ -14,8 +18,10 @@ use reth::primitives::{Bytes, B256, U256, U64}; use sqlx::postgres::PgPool; use std::{ops::Mul, str::FromStr}; use time::OffsetDateTime; +use tracing::trace; use uuid::Uuid; +#[derive(Debug, Clone)] pub struct RelayDB { pool: PgPool, } @@ -241,6 +247,42 @@ impl RelayDB { } } +#[async_trait] +impl Datasource for RelayDB { + async fn get_data(&self, block: BlockRef) -> eyre::Result> { + let bundles = self + .get_simulated_bundles_for_block(block.block_number) + .await + .with_context(|| format!("Failed to fetch bundles for block {}", block.block_number))?; + + let block_timestamp = OffsetDateTime::from_unix_timestamp(block.block_timestamp as i64)?; + let share_bundles = self + .get_simulated_share_bundles_for_block(block.block_number, block_timestamp) + .await + .with_context(|| { + format!( + "Failed to fetch share bundles for block {}", + block.block_number + ) + })?; + + trace!( + "Fetched bundles from flashbots db, bundles: {}, sbundles: {}", + bundles.len(), + share_bundles.len() + ); + + Ok(bundles + .into_iter() + .chain(share_bundles.into_iter()) + .collect()) + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + fn sql_decimal_to_wei(val: sqlx::types::BigDecimal) -> Option { let (bi, exp) = val.into_bigint_and_exponent(); let (bi_sign, bi_bytes) = bi.to_bytes_be(); diff --git a/crates/rbuilder/src/backtest/fetch/mempool.rs b/crates/rbuilder/src/backtest/fetch/mempool.rs index ca14b35a..94820524 100644 --- a/crates/rbuilder/src/backtest/fetch/mempool.rs +++ b/crates/rbuilder/src/backtest/fetch/mempool.rs @@ -1,25 +1,30 @@ use crate::{ - backtest::OrdersWithTimestamp, + backtest::{ + fetch::datasource::{BlockRef, Datasource}, + OrdersWithTimestamp, + }, primitives::{ serialize::{RawOrder, RawTx, TxEncoding}, Order, }, }; +use async_trait::async_trait; +use eyre::WrapErr; use mempool_dumpster::TransactionRangeError; use sqlx::types::chrono::DateTime; use std::path::{Path, PathBuf}; -use time::OffsetDateTime; -use tracing::error; +use time::{Duration, OffsetDateTime}; +use tracing::{error, trace}; pub fn get_mempool_transactions( - data_dir: impl AsRef, + data_dir: &Path, from: OffsetDateTime, to: OffsetDateTime, ) -> eyre::Result> { let from_millis: i64 = (from.unix_timestamp_nanos() / 1_000_000).try_into()?; let to_millis: i64 = (to.unix_timestamp_nanos() / 1_000_000).try_into()?; - check_and_download_transaction_files(from_millis, to_millis, &data_dir)?; + check_and_download_transaction_files(from_millis, to_millis, data_dir)?; let txs = mempool_dumpster::get_raw_transactions(data_dir, from_millis, to_millis)?; Ok(txs @@ -46,16 +51,14 @@ pub fn get_mempool_transactions( .collect()) } -fn path_transactions(data_dir: impl AsRef, day: &str) -> PathBuf { - data_dir - .as_ref() - .join(format!("transactions/{}.parquet", day)) +fn path_transactions(data_dir: &Path, day: &str) -> PathBuf { + data_dir.join(format!("transactions/{}.parquet", day)) } fn check_and_download_transaction_files( from_millis: i64, to_millis: i64, - data_dir: impl AsRef, + data_dir: &Path, ) -> eyre::Result<()> { let from_time = DateTime::from_timestamp_millis(from_millis) .ok_or(TransactionRangeError::InvalidTimestamp)?; @@ -74,10 +77,10 @@ fn check_and_download_transaction_files( // check all day files for day in &days { - let path = path_transactions(&data_dir, day); + let path = path_transactions(data_dir, day); if !path.exists() { tracing::warn!("Missing file: {}", path.display()); - let config = mempool_dumpster::Config::new(&data_dir) + let config = mempool_dumpster::Config::new(data_dir) .with_progress(true) .with_overwrite(true); config.download_transaction_file(day)?; @@ -86,6 +89,47 @@ fn check_and_download_transaction_files( Ok(()) } +#[derive(Debug, Clone)] +pub struct MempoolDumpsterDatasource { + path: PathBuf, +} + +#[async_trait] +impl Datasource for MempoolDumpsterDatasource { + async fn get_data(&self, block: BlockRef) -> eyre::Result> { + let (from, to) = { + let block_time = OffsetDateTime::from_unix_timestamp(block.block_timestamp as i64)?; + ( + block_time - Duration::minutes(3), + // we look ahead by 5 seconds in case block bid was delayed relative to the timestamp + block_time + Duration::seconds(5), + ) + }; + let mempool_txs = get_mempool_transactions(&self.path, from, to).wrap_err_with(|| { + format!( + "Failed to fetch mempool transactions for block {}", + block.block_number, + ) + })?; + trace!( + "Fetched unfiltered mempool transactions, count: {}", + mempool_txs.len() + ); + // TODO: Filter to only include tnxs from block? + Ok(mempool_txs) + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl MempoolDumpsterDatasource { + pub fn new(path: impl Into) -> Self { + Self { path: path.into() } + } +} + #[cfg(test)] mod test { use super::*; @@ -99,7 +143,7 @@ mod test { let from = datetime!(2023-09-04 23:59:00 UTC); let to = datetime!(2023-09-05 00:01:00 UTC); - let txs = get_mempool_transactions(data_dir, from, to).unwrap(); + let txs = get_mempool_transactions(data_dir.as_ref(), from, to).unwrap(); assert_eq!(txs.len(), 1938); dbg!(txs.len()); dbg!(&txs[0]); diff --git a/crates/rbuilder/src/backtest/fetch/mod.rs b/crates/rbuilder/src/backtest/fetch/mod.rs index 610262a2..f14aa528 100644 --- a/crates/rbuilder/src/backtest/fetch/mod.rs +++ b/crates/rbuilder/src/backtest/fetch/mod.rs @@ -1,8 +1,16 @@ +pub mod datasource; pub mod flashbots_db; pub mod mempool; pub mod mev_boost; -use crate::{backtest::BlockData, mev_boost::BuilderBlockReceived, utils::timestamp_as_u64}; +use crate::{ + backtest::{ + fetch::datasource::{BlockRef, Datasource}, + BlockData, + }, + mev_boost::BuilderBlockReceived, + utils::timestamp_as_u64, +}; use alloy_provider::Provider; use alloy_rpc_types::{Block, BlockId, BlockNumberOrTag}; @@ -12,7 +20,6 @@ use flashbots_db::RelayDB; use futures::TryStreamExt; use sqlx::PgPool; use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use time::{Duration, OffsetDateTime}; use tokio::sync::Mutex; use tracing::{info, trace}; @@ -25,9 +32,7 @@ use crate::{ pub struct HistoricalDataFetcher { eth_provider: BoxedProvider, eth_rpc_parallel: usize, - mempool_datadir: PathBuf, - // If none, skip bundles - flashbots_db: Option, + data_sources: Vec>, payload_delivered_fetcher: PayloadDeliveredFetcher, } @@ -38,15 +43,26 @@ impl HistoricalDataFetcher { mempool_datadir: PathBuf, flashbots_db: Option, ) -> Self { + let mut data_sources: Vec> = vec![Box::new( + mempool::MempoolDumpsterDatasource::new(mempool_datadir), + )]; + + if let Some(db_pool) = flashbots_db { + data_sources.push(Box::new(RelayDB::new(db_pool))); + } + Self { eth_provider, eth_rpc_parallel, - mempool_datadir, - flashbots_db, + data_sources, payload_delivered_fetcher: PayloadDeliveredFetcher::default(), } } + pub fn add_datasource(&mut self, datasource: Box) { + self.data_sources.push(datasource); + } + async fn get_payload_delivered_bid_trace( &self, block_number: u64, @@ -76,68 +92,6 @@ impl HistoricalDataFetcher { Ok(block) } - async fn fetch_mempool_txs( - &self, - block_number: u64, - block_timestamp: u64, - ) -> eyre::Result> { - let (from, to) = { - let block_time = OffsetDateTime::from_unix_timestamp(block_timestamp as i64)?; - ( - block_time - Duration::minutes(3), - // we look ahead by 5 seconds in case block bid was delayed relative to the timestamp - block_time + Duration::seconds(5), - ) - }; - let mempool_txs = mempool::get_mempool_transactions(&self.mempool_datadir, from, to) - .wrap_err_with(|| { - format!( - "Failed to fetch mempool transactions for block {}", - block_number - ) - })?; - trace!( - "Fetched unfiltered mempool transactions, count: {}", - mempool_txs.len() - ); - Ok(mempool_txs) - } - - async fn fetch_bundles( - &self, - block_number: u64, - block_timestamp: u64, - ) -> eyre::Result> { - let db = if let Some(db) = &self.flashbots_db { - RelayDB::new(db.clone()) - } else { - info!("Flashbots db not set, skipping bundles"); - return Ok(Vec::new()); - }; - - let bundles = db - .get_simulated_bundles_for_block(block_number) - .await - .with_context(|| format!("Failed to fetch bundles for block {}", block_number))?; - - let block_timestamp = OffsetDateTime::from_unix_timestamp(block_timestamp as i64)?; - let share_bundles = db - .get_simulated_share_bundles_for_block(block_number, block_timestamp) - .await - .with_context(|| format!("Failed to fetch share bundles for block {}", block_number))?; - - trace!( - "Fetched bundles from flashbots db, bundles: {}, sbundles: {}", - bundles.len(), - share_bundles.len() - ); - - Ok(bundles - .into_iter() - .chain(share_bundles.into_iter()) - .collect()) - } - fn filter_orders_by_base_fee( &self, block_base_fee: u128, @@ -236,14 +190,14 @@ impl HistoricalDataFetcher { let onchain_block = self.get_onchain_block(block_number).await?; let block_timestamp: u64 = timestamp_as_u64(&onchain_block); - let mut orders = { - let mut orders = self - .fetch_mempool_txs(block_number, block_timestamp) - .await?; - let bundles = self.fetch_bundles(block_number, block_timestamp).await?; - orders.extend(bundles); - orders - }; + + let mut orders: Vec = vec![]; + let block_ref = BlockRef::new(block_number, block_timestamp); + + for datasource in &self.data_sources { + let mut datasource_orders = datasource.get_data(block_ref).await?; + orders.append(&mut datasource_orders); + } info!("Fetched orders, unfiltered: {}", orders.len()); From 5f37b721a20f19fd8a3768848a462cabdc536fb2 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 9 Jul 2024 06:28:03 +0200 Subject: [PATCH 2/2] Address comments --- crates/rbuilder/src/backtest/fetch/datasource.rs | 10 +++++----- crates/rbuilder/src/backtest/fetch/flashbots_db.rs | 8 ++++---- crates/rbuilder/src/backtest/fetch/mempool.rs | 8 ++++---- crates/rbuilder/src/backtest/fetch/mod.rs | 10 +++++----- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/crates/rbuilder/src/backtest/fetch/datasource.rs b/crates/rbuilder/src/backtest/fetch/datasource.rs index 605668d3..bdc9955a 100644 --- a/crates/rbuilder/src/backtest/fetch/datasource.rs +++ b/crates/rbuilder/src/backtest/fetch/datasource.rs @@ -1,16 +1,16 @@ use crate::backtest::OrdersWithTimestamp; use async_trait::async_trait; -/// Datasource trait +/// DataSource trait /// /// This trait is used to fetch data from a datasource #[async_trait] -pub trait Datasource: std::fmt::Debug { - async fn get_data(&self, block: BlockRef) -> eyre::Result>; - fn clone_box(&self) -> Box; +pub trait DataSource: std::fmt::Debug { + async fn get_orders(&self, block: BlockRef) -> eyre::Result>; + fn clone_box(&self) -> Box; } -impl Clone for Box { +impl Clone for Box { fn clone(&self) -> Self { self.clone_box() } diff --git a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs index 77d01e95..4a6a6373 100644 --- a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs +++ b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs @@ -1,6 +1,6 @@ use crate::{ backtest::{ - fetch::datasource::{BlockRef, Datasource}, + fetch::datasource::{BlockRef, DataSource}, OrdersWithTimestamp, }, primitives::{ @@ -248,8 +248,8 @@ impl RelayDB { } #[async_trait] -impl Datasource for RelayDB { - async fn get_data(&self, block: BlockRef) -> eyre::Result> { +impl DataSource for RelayDB { + async fn get_orders(&self, block: BlockRef) -> eyre::Result> { let bundles = self .get_simulated_bundles_for_block(block.block_number) .await @@ -278,7 +278,7 @@ impl Datasource for RelayDB { .collect()) } - fn clone_box(&self) -> Box { + fn clone_box(&self) -> Box { Box::new(self.clone()) } } diff --git a/crates/rbuilder/src/backtest/fetch/mempool.rs b/crates/rbuilder/src/backtest/fetch/mempool.rs index 94820524..56823bb2 100644 --- a/crates/rbuilder/src/backtest/fetch/mempool.rs +++ b/crates/rbuilder/src/backtest/fetch/mempool.rs @@ -1,6 +1,6 @@ use crate::{ backtest::{ - fetch::datasource::{BlockRef, Datasource}, + fetch::datasource::{BlockRef, DataSource}, OrdersWithTimestamp, }, primitives::{ @@ -95,8 +95,8 @@ pub struct MempoolDumpsterDatasource { } #[async_trait] -impl Datasource for MempoolDumpsterDatasource { - async fn get_data(&self, block: BlockRef) -> eyre::Result> { +impl DataSource for MempoolDumpsterDatasource { + async fn get_orders(&self, block: BlockRef) -> eyre::Result> { let (from, to) = { let block_time = OffsetDateTime::from_unix_timestamp(block.block_timestamp as i64)?; ( @@ -119,7 +119,7 @@ impl Datasource for MempoolDumpsterDatasource { Ok(mempool_txs) } - fn clone_box(&self) -> Box { + fn clone_box(&self) -> Box { Box::new(self.clone()) } } diff --git a/crates/rbuilder/src/backtest/fetch/mod.rs b/crates/rbuilder/src/backtest/fetch/mod.rs index f14aa528..3d3bdbf3 100644 --- a/crates/rbuilder/src/backtest/fetch/mod.rs +++ b/crates/rbuilder/src/backtest/fetch/mod.rs @@ -5,7 +5,7 @@ pub mod mev_boost; use crate::{ backtest::{ - fetch::datasource::{BlockRef, Datasource}, + fetch::datasource::{BlockRef, DataSource}, BlockData, }, mev_boost::BuilderBlockReceived, @@ -32,7 +32,7 @@ use crate::{ pub struct HistoricalDataFetcher { eth_provider: BoxedProvider, eth_rpc_parallel: usize, - data_sources: Vec>, + data_sources: Vec>, payload_delivered_fetcher: PayloadDeliveredFetcher, } @@ -43,7 +43,7 @@ impl HistoricalDataFetcher { mempool_datadir: PathBuf, flashbots_db: Option, ) -> Self { - let mut data_sources: Vec> = vec![Box::new( + let mut data_sources: Vec> = vec![Box::new( mempool::MempoolDumpsterDatasource::new(mempool_datadir), )]; @@ -59,7 +59,7 @@ impl HistoricalDataFetcher { } } - pub fn add_datasource(&mut self, datasource: Box) { + pub fn add_datasource(&mut self, datasource: Box) { self.data_sources.push(datasource); } @@ -195,7 +195,7 @@ impl HistoricalDataFetcher { let block_ref = BlockRef::new(block_number, block_timestamp); for datasource in &self.data_sources { - let mut datasource_orders = datasource.get_data(block_ref).await?; + let mut datasource_orders = datasource.get_orders(block_ref).await?; orders.append(&mut datasource_orders); }