Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a vec of trait datasources to historical data fetcher #13

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ governor = "0.6.3"
derivative = "2.2.0"
mockall = "0.12.1"
shellexpand = "3.1.0"
async-trait = "0.1.80"

[build-dependencies]
built = { version = "0.7.1", features = ["git2", "chrono"] }
Expand Down
32 changes: 32 additions & 0 deletions crates/rbuilder/src/backtest/fetch/datasource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::backtest::OrdersWithTimestamp;
use async_trait::async_trait;

/// Datasource trait
///
/// This trait is used to fetch data from a datasource
#[async_trait]
pub trait Datasource: std::fmt::Debug {
async fn get_data(&self, block: BlockRef) -> eyre::Result<Vec<OrdersWithTimestamp>>;
fn clone_box(&self) -> Box<dyn Datasource>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this instead of making Datasource Clonable (the same way it has std::fmt::Debug)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it but it does not work. Do not have enough context in Rust to know. This was generated by Claude but also a common pattern I found for this use case of storing vec<box>>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dyn can't be used with Clonable since it's Sized :(.

}

impl Clone for Box<dyn Datasource> {
fn clone(&self) -> Self {
self.clone_box()
}
}

#[derive(Debug, Copy, Clone)]
pub struct BlockRef {
pub block_number: u64,
pub block_timestamp: u64,
}

impl BlockRef {
pub fn new(block_number: u64, block_timestamp: u64) -> Self {
Self {
block_number,
block_timestamp,
}
}
}
44 changes: 43 additions & 1 deletion crates/rbuilder/src/backtest/fetch/flashbots_db.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::{
backtest::OrdersWithTimestamp,
backtest::{
fetch::datasource::{BlockRef, Datasource},
OrdersWithTimestamp,
},
primitives::{
serialize::{RawBundle, RawOrder, RawShareBundle, TxEncoding},
Order, SimValue,
},
};
use async_trait::async_trait;
use bigdecimal::{
num_bigint::{BigInt, Sign, ToBigInt},
BigDecimal,
Expand All @@ -14,8 +18,10 @@ use reth::primitives::{Bytes, B256, U256, U64};
use sqlx::postgres::PgPool;
use std::{ops::Mul, str::FromStr};
use time::OffsetDateTime;
use tracing::trace;
use uuid::Uuid;

#[derive(Debug, Clone)]
pub struct RelayDB {
pool: PgPool,
}
Expand Down Expand Up @@ -241,6 +247,42 @@ impl RelayDB {
}
}

#[async_trait]
impl Datasource for RelayDB {
async fn get_data(&self, block: BlockRef) -> eyre::Result<Vec<OrdersWithTimestamp>> {
let bundles = self
.get_simulated_bundles_for_block(block.block_number)
.await
.with_context(|| format!("Failed to fetch bundles for block {}", block.block_number))?;

let block_timestamp = OffsetDateTime::from_unix_timestamp(block.block_timestamp as i64)?;
let share_bundles = self
.get_simulated_share_bundles_for_block(block.block_number, block_timestamp)
.await
.with_context(|| {
format!(
"Failed to fetch share bundles for block {}",
block.block_number
)
})?;

trace!(
"Fetched bundles from flashbots db, bundles: {}, sbundles: {}",
bundles.len(),
share_bundles.len()
);

Ok(bundles
.into_iter()
.chain(share_bundles.into_iter())
.collect())
}

fn clone_box(&self) -> Box<dyn Datasource> {
Box::new(self.clone())
}
}

fn sql_decimal_to_wei(val: sqlx::types::BigDecimal) -> Option<U256> {
let (bi, exp) = val.into_bigint_and_exponent();
let (bi_sign, bi_bytes) = bi.to_bytes_be();
Expand Down
70 changes: 57 additions & 13 deletions crates/rbuilder/src/backtest/fetch/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use crate::{
backtest::OrdersWithTimestamp,
backtest::{
fetch::datasource::{BlockRef, Datasource},
OrdersWithTimestamp,
},
primitives::{
serialize::{RawOrder, RawTx, TxEncoding},
Order,
},
};
use async_trait::async_trait;
use eyre::WrapErr;
use mempool_dumpster::TransactionRangeError;
use sqlx::types::chrono::DateTime;
use std::path::{Path, PathBuf};
use time::OffsetDateTime;
use tracing::error;
use time::{Duration, OffsetDateTime};
use tracing::{error, trace};

pub fn get_mempool_transactions(
data_dir: impl AsRef<Path>,
data_dir: &Path,
from: OffsetDateTime,
to: OffsetDateTime,
) -> eyre::Result<Vec<OrdersWithTimestamp>> {
let from_millis: i64 = (from.unix_timestamp_nanos() / 1_000_000).try_into()?;
let to_millis: i64 = (to.unix_timestamp_nanos() / 1_000_000).try_into()?;

check_and_download_transaction_files(from_millis, to_millis, &data_dir)?;
check_and_download_transaction_files(from_millis, to_millis, data_dir)?;

let txs = mempool_dumpster::get_raw_transactions(data_dir, from_millis, to_millis)?;
Ok(txs
Expand All @@ -46,16 +51,14 @@ pub fn get_mempool_transactions(
.collect())
}

fn path_transactions(data_dir: impl AsRef<Path>, day: &str) -> PathBuf {
data_dir
.as_ref()
.join(format!("transactions/{}.parquet", day))
fn path_transactions(data_dir: &Path, day: &str) -> PathBuf {
data_dir.join(format!("transactions/{}.parquet", day))
}

fn check_and_download_transaction_files(
from_millis: i64,
to_millis: i64,
data_dir: impl AsRef<Path>,
data_dir: &Path,
) -> eyre::Result<()> {
let from_time = DateTime::from_timestamp_millis(from_millis)
.ok_or(TransactionRangeError::InvalidTimestamp)?;
Expand All @@ -74,10 +77,10 @@ fn check_and_download_transaction_files(

// check all day files
for day in &days {
let path = path_transactions(&data_dir, day);
let path = path_transactions(data_dir, day);
if !path.exists() {
tracing::warn!("Missing file: {}", path.display());
let config = mempool_dumpster::Config::new(&data_dir)
let config = mempool_dumpster::Config::new(data_dir)
.with_progress(true)
.with_overwrite(true);
config.download_transaction_file(day)?;
Expand All @@ -86,6 +89,47 @@ fn check_and_download_transaction_files(
Ok(())
}

#[derive(Debug, Clone)]
pub struct MempoolDumpsterDatasource {
path: PathBuf,
}

#[async_trait]
impl Datasource for MempoolDumpsterDatasource {
async fn get_data(&self, block: BlockRef) -> eyre::Result<Vec<OrdersWithTimestamp>> {
let (from, to) = {
let block_time = OffsetDateTime::from_unix_timestamp(block.block_timestamp as i64)?;
(
block_time - Duration::minutes(3),
// we look ahead by 5 seconds in case block bid was delayed relative to the timestamp
block_time + Duration::seconds(5),
)
};
let mempool_txs = get_mempool_transactions(&self.path, from, to).wrap_err_with(|| {
format!(
"Failed to fetch mempool transactions for block {}",
block.block_number,
)
})?;
trace!(
"Fetched unfiltered mempool transactions, count: {}",
mempool_txs.len()
);
// TODO: Filter to only include tnxs from block?
Ok(mempool_txs)
}

fn clone_box(&self) -> Box<dyn Datasource> {
Box::new(self.clone())
}
}

impl MempoolDumpsterDatasource {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -99,7 +143,7 @@ mod test {
let from = datetime!(2023-09-04 23:59:00 UTC);
let to = datetime!(2023-09-05 00:01:00 UTC);

let txs = get_mempool_transactions(data_dir, from, to).unwrap();
let txs = get_mempool_transactions(data_dir.as_ref(), from, to).unwrap();
assert_eq!(txs.len(), 1938);
dbg!(txs.len());
dbg!(&txs[0]);
Expand Down
Loading
Loading