diff --git a/crates/rbuilder/src/backtest/fetch/datasource.rs b/crates/rbuilder/src/backtest/fetch/data_source.rs similarity index 100% rename from crates/rbuilder/src/backtest/fetch/datasource.rs rename to crates/rbuilder/src/backtest/fetch/data_source.rs diff --git a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs index 4a6a6373..0f0df655 100644 --- a/crates/rbuilder/src/backtest/fetch/flashbots_db.rs +++ b/crates/rbuilder/src/backtest/fetch/flashbots_db.rs @@ -1,6 +1,6 @@ use crate::{ backtest::{ - fetch::datasource::{BlockRef, DataSource}, + fetch::data_source::{BlockRef, DataSource}, OrdersWithTimestamp, }, primitives::{ diff --git a/crates/rbuilder/src/backtest/fetch/mempool.rs b/crates/rbuilder/src/backtest/fetch/mempool.rs index 39258e11..7ed39214 100644 --- a/crates/rbuilder/src/backtest/fetch/mempool.rs +++ b/crates/rbuilder/src/backtest/fetch/mempool.rs @@ -2,7 +2,7 @@ //! It downloads all the needed parquet files and keeps them cached for future use. use crate::{ backtest::{ - fetch::datasource::{BlockRef, DataSource}, + fetch::data_source::{BlockRef, DataSource}, OrdersWithTimestamp, }, primitives::{ @@ -14,7 +14,10 @@ use async_trait::async_trait; use eyre::WrapErr; use mempool_dumpster::TransactionRangeError; use sqlx::types::chrono::DateTime; -use std::path::{Path, PathBuf}; +use std::{ + fs::create_dir_all, + path::{Path, PathBuf}, +}; use time::{Duration, OffsetDateTime}; use tracing::{error, trace}; @@ -134,8 +137,14 @@ impl DataSource for MempoolDumpsterDatasource { } impl MempoolDumpsterDatasource { - pub fn new(path: impl Into) -> Self { - Self { path: path.into() } + pub fn new(path: impl Into) -> Result { + let path: PathBuf = path.into(); + + // create the directory if it doesn't exist + create_dir_all(&path)?; + create_dir_all(path.join("transactions"))?; + + Ok(Self { path }) } } @@ -146,17 +155,17 @@ mod test { use time::macros::datetime; #[ignore_if_env_not_set("MEMPOOL_DATADIR")] - #[test] - fn test_get_mempool_transactions() { + #[tokio::test] + async fn test_get_mempool_transactions() { let data_dir = std::env::var("MEMPOOL_DATADIR").expect("MEMPOOL_DATADIR not set"); - let from = datetime!(2023-09-04 23:59:00 UTC); - let to = datetime!(2023-09-05 00:01:00 UTC); + let source = MempoolDumpsterDatasource::new(data_dir).unwrap(); + let block = BlockRef { + block_number: 18048817, + block_timestamp: datetime!(2023-09-04 23:59:00 UTC).unix_timestamp() as u64, + }; - let txs = get_mempool_transactions(data_dir.as_ref(), from, to).unwrap(); - assert_eq!(txs.len(), 1938); - dbg!(txs.len()); - dbg!(&txs[0]); - dbg!(&txs[txs.len() - 1]); + let txs = source.get_orders(block).await.unwrap(); + assert_eq!(txs.len(), 1732); } } diff --git a/crates/rbuilder/src/backtest/fetch/mod.rs b/crates/rbuilder/src/backtest/fetch/mod.rs index 061c50f7..c72ddd0f 100644 --- a/crates/rbuilder/src/backtest/fetch/mod.rs +++ b/crates/rbuilder/src/backtest/fetch/mod.rs @@ -1,11 +1,11 @@ -pub mod datasource; +pub mod data_source; pub mod flashbots_db; pub mod mempool; pub mod mev_boost; use crate::{ backtest::{ - fetch::datasource::{BlockRef, DataSource}, + fetch::data_source::{BlockRef, DataSource}, BlockData, }, mev_boost::BuilderBlockReceived, @@ -52,21 +52,21 @@ impl HistoricalDataFetcher { eth_rpc_parallel: usize, mempool_datadir: PathBuf, flashbots_db: Option, - ) -> Self { + ) -> eyre::Result { let mut data_sources: Vec> = vec![Box::new( - mempool::MempoolDumpsterDatasource::new(mempool_datadir), + mempool::MempoolDumpsterDatasource::new(mempool_datadir)?, )]; if let Some(db_pool) = flashbots_db { data_sources.push(Box::new(RelayDB::new(db_pool))); } - Self { + Ok(Self { eth_provider, eth_rpc_parallel, data_sources, payload_delivered_fetcher: PayloadDeliveredFetcher::default(), - } + }) } pub fn add_datasource(&mut self, datasource: Box) { diff --git a/crates/rbuilder/src/bin/backtest-fetch.rs b/crates/rbuilder/src/bin/backtest-fetch.rs index 40421c19..e7077e14 100644 --- a/crates/rbuilder/src/bin/backtest-fetch.rs +++ b/crates/rbuilder/src/bin/backtest-fetch.rs @@ -61,11 +61,6 @@ async fn main() -> eyre::Result<()> { // create paths for backtest_fetch_mempool_data_dir (i.e "~/.rbuilder/mempool-data" and ".../transactions") let backtest_fetch_mempool_data_dir = config.base_config().backtest_fetch_mempool_data_dir()?; - fs::create_dir_all(&backtest_fetch_mempool_data_dir)?; - let mut backtest_fetch_mempool_data_dir_txs = - config.base_config().backtest_fetch_mempool_data_dir()?; - backtest_fetch_mempool_data_dir_txs.push("transactions"); - fs::create_dir_all(&backtest_fetch_mempool_data_dir_txs)?; let db = config.base_config().flashbots_db().await?; let provider = config.base_config().eth_rpc_provider()?; @@ -74,7 +69,7 @@ async fn main() -> eyre::Result<()> { config.base_config().backtest_fetch_eth_rpc_parallel, backtest_fetch_mempool_data_dir, db, - ); + )?; let blocks_to_fetch: Box> = if cli.range { let from_block = cli.blocks.first().copied().unwrap_or(0);