diff --git a/config-backtest-example.toml b/config-backtest-example.toml index f789ce93..1fa7f6fd 100644 --- a/config-backtest-example.toml +++ b/config-backtest-example.toml @@ -28,3 +28,9 @@ sorting = "max-profit" failed_order_retries = 1 drop_failed_orders = true +[[builders]] +name = "merging" +algo = "merging-builder" +discard_txs = true +num_threads = 5 +merge_wait_time_ms = 100 \ No newline at end of file diff --git a/config-live-example.toml b/config-live-example.toml index 020db91a..51779d18 100644 --- a/config-live-example.toml +++ b/config-live-example.toml @@ -58,3 +58,10 @@ discard_txs = true sorting = "max-profit" failed_order_retries = 1 drop_failed_orders = true + +[[builders]] +name = "merging" +algo = "merging-builder" +discard_txs = true +num_threads = 5 +merge_wait_time_ms = 100 \ No newline at end of file diff --git a/crates/rbuilder/src/building/builders/merging_builder/combinator.rs b/crates/rbuilder/src/building/builders/merging_builder/combinator.rs new file mode 100644 index 00000000..1bb0562f --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/combinator.rs @@ -0,0 +1,170 @@ +use crate::{ + building::{ + builders::block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromDB}, + BlockBuildingContext, + }, + roothash::RootHashConfig, +}; + +use crate::utils::check_provider_factory_health; +use reth::{providers::ProviderFactory, tasks::pool::BlockingTaskPool}; +use reth_db::database::Database; +use reth_payload_builder::database::CachedReads; +use std::time::Instant; +use time::OffsetDateTime; +use tokio_util::sync::CancellationToken; +use tracing::{info_span, trace}; + +use super::{GroupOrdering, OrderGroup}; + +/// CombinatorContext is used for merging the best ordering from groups into final block. +#[derive(Debug)] +pub struct CombinatorContext { + provider_factory: ProviderFactory, + root_hash_task_pool: BlockingTaskPool, + ctx: BlockBuildingContext, + groups: Vec, + cancellation_token: CancellationToken, + cached_reads: Option, + discard_txs: bool, + coinbase_payment: bool, + root_hash_config: RootHashConfig, + builder_name: String, +} + +impl CombinatorContext { + #[allow(clippy::too_many_arguments)] + pub fn new( + provider_factory: ProviderFactory, + root_hash_task_pool: BlockingTaskPool, + ctx: BlockBuildingContext, + groups: Vec, + cancellation_token: CancellationToken, + cached_reads: Option, + discard_txs: bool, + coinbase_payment: bool, + root_hash_config: RootHashConfig, + builder_name: String, + ) -> Self { + CombinatorContext { + provider_factory, + root_hash_task_pool, + ctx, + groups, + cancellation_token, + cached_reads, + discard_txs, + coinbase_payment, + root_hash_config, + builder_name, + } + } + + pub fn set_groups(&mut self, groups: Vec) { + self.groups = groups; + } + + /// Checks for simulated bundles that generated kickbacks. + /// orderings MUST be the same size as self.groups + fn contains_kickbacks(&self, orderings: &[GroupOrdering]) -> bool { + orderings.iter().enumerate().any(|(group_idx, ordering)| { + ordering.orders.iter().any(|(order_idx, _)| { + !self.groups[group_idx].orders[*order_idx] + .sim_value + .paid_kickbacks + .is_empty() + }) + }) + } + + pub fn combine_best_groups_mergings( + &mut self, + orders_closed_at: OffsetDateTime, + can_use_suggested_fee_recipient_as_coinbase: bool, + ) -> eyre::Result> { + let build_attempt_id: u32 = rand::random(); + let span = info_span!("build_run", build_attempt_id); + let _guard = span.enter(); + check_provider_factory_health(self.ctx.block(), &self.provider_factory)?; + + let build_start = Instant::now(); + + let mut best_orderings: Vec = self + .groups + .iter() + .map(|g| g.best_ordering.lock().unwrap().clone()) + .collect(); + + let use_suggested_fee_recipient_as_coinbase = self.coinbase_payment + && !self.contains_kickbacks(&best_orderings) + && can_use_suggested_fee_recipient_as_coinbase; + // Create a new ctx to remove builder_signer if necessary + let mut ctx = self.ctx.clone(); + if use_suggested_fee_recipient_as_coinbase { + ctx.modify_use_suggested_fee_recipient_as_coinbase(); + } + + let mut block_building_helper = BlockBuildingHelperFromDB::new( + self.provider_factory.clone(), + self.root_hash_task_pool.clone(), + self.root_hash_config.clone(), + ctx, + // @Perf cached reads / cursor caches + None, + self.builder_name.clone(), + self.discard_txs, + None, + self.cancellation_token.clone(), + )?; + block_building_helper.set_trace_orders_closed_at(orders_closed_at); + // loop until we insert all orders into the block + loop { + if self.cancellation_token.is_cancelled() { + break; + } + + let sim_order = if let Some((group_idx, order_idx, _)) = best_orderings + .iter() + .enumerate() + .filter_map(|(group_idx, ordering)| { + ordering + .orders + .first() + .map(|(order_idx, order_profit)| (group_idx, *order_idx, *order_profit)) + }) + .max_by_key(|(_, _, p)| *p) + { + best_orderings[group_idx].orders.remove(0); + &self.groups[group_idx].orders[order_idx] + } else { + // no order left in the groups + break; + }; + + let start_time = Instant::now(); + let commit_result = block_building_helper.commit_order(sim_order)?; + let order_commit_time = start_time.elapsed(); + + let mut gas_used = 0; + let mut execution_error = None; + let success = commit_result.is_ok(); + match commit_result { + Ok(res) => { + gas_used = res.gas_used; + } + Err(err) => execution_error = Some(err), + } + trace!( + order_id = ?sim_order.id(), + success, + order_commit_time_mus = order_commit_time.as_micros(), + gas_used, + ?execution_error, + "Executed order" + ); + } + block_building_helper.set_trace_fill_time(build_start.elapsed()); + self.cached_reads = Some(block_building_helper.clone_cached_reads()); + Ok(Box::new(block_building_helper)) + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/groups.rs b/crates/rbuilder/src/building/builders/merging_builder/groups.rs new file mode 100644 index 00000000..43f7e0e3 --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/groups.rs @@ -0,0 +1,364 @@ +use crate::{ + building::evm_inspector::SlotKey, + primitives::{OrderId, SimulatedOrder}, +}; +use ahash::{HashMap, HashSet}; +use alloy_primitives::U256; +use itertools::Itertools; + +use std::sync::{Arc, Mutex}; + +/// GroupOrdering describes order of certain groups of orders. +#[derive(Debug, Clone)] +pub struct GroupOrdering { + /// Total coinbase profit of the given ordering. + pub total_profit: U256, + /// Orders of the group (index of order in the group, profit ot the given order) + pub orders: Vec<(usize, U256)>, +} + +/// OrderGroups describes set of conflicting orders. +/// It's meant to be shared between thread who merges the group and who uses the best ordering to combine the result. +#[derive(Debug, Clone)] +pub struct OrderGroup { + pub orders: Arc>, + pub best_ordering: Arc>, // idx, profit +} + +#[derive(Debug, Default)] +struct GroupData { + orders: Vec, + reads: Vec, + writes: Vec, +} + +/// CachedGroups is used to quickly update set of groups when new orders arrive +#[derive(Debug)] +pub struct CachedGroups { + group_counter: usize, + group_reads: HashMap>, + group_writes: HashMap>, + groups: HashMap, + orders: HashSet, +} + +impl CachedGroups { + pub fn new() -> Self { + CachedGroups { + group_counter: 0, + group_reads: HashMap::default(), + group_writes: HashMap::default(), + groups: HashMap::default(), + orders: HashSet::default(), + } + } + + pub fn add_orders(&mut self, orders: Vec) { + for order in orders { + if self.orders.contains(&order.id()) { + continue; + } + self.orders.insert(order.id()); + + let used_state = if let Some(used_state) = &order.used_state_trace { + used_state.clone() + } else { + continue; + }; + + let mut all_groups_in_conflict = Vec::new(); + + for read_key in used_state.read_slot_values.keys() { + if let Some(group) = self.group_writes.get(read_key) { + all_groups_in_conflict.extend_from_slice(group); + } + } + for write_key in used_state.written_slot_values.keys() { + if let Some(group) = self.group_reads.get(write_key) { + all_groups_in_conflict.extend_from_slice(group); + } + } + all_groups_in_conflict.sort(); + all_groups_in_conflict.dedup(); + + match all_groups_in_conflict.len() { + 0 => { + // create new group with only one order in it + let group_id = self.group_counter; + self.group_counter += 1; + let group_data = GroupData { + orders: vec![order], + reads: used_state.read_slot_values.keys().cloned().collect(), + writes: used_state.written_slot_values.keys().cloned().collect(), + }; + for read in &group_data.reads { + self.group_reads + .entry(read.clone()) + .or_default() + .push(group_id); + } + for write in &group_data.writes { + self.group_writes + .entry(write.clone()) + .or_default() + .push(group_id); + } + self.groups.insert(group_id, group_data); + } + 1 => { + // merge order into the group + let group_id = all_groups_in_conflict[0]; + let group_data = self.groups.get_mut(&group_id).expect("group not found"); + group_data.orders.push(order); + for read in used_state.read_slot_values.keys() { + group_data.reads.push(read.clone()); + } + group_data.reads.sort(); + group_data.reads.dedup(); + for write in used_state.written_slot_values.keys() { + group_data.writes.push(write.clone()); + } + group_data.writes.sort(); + group_data.writes.dedup(); + for read in &group_data.reads { + let group_reads_slot = self.group_reads.entry(read.clone()).or_default(); + if !group_reads_slot.contains(&group_id) { + group_reads_slot.push(group_id); + } + } + for write in &group_data.writes { + let group_writes_slot = self.group_writes.entry(write.clone()).or_default(); + if !group_writes_slot.contains(&group_id) { + group_writes_slot.push(group_id); + } + } + } + _ => { + // merge multiple group together and add new order there + let conflicting_groups = all_groups_in_conflict + .into_iter() + .map(|group_id| (group_id, self.groups.remove(&group_id).unwrap())) + .collect::>(); + for (group_id, group_data) in &conflicting_groups { + for read in &group_data.reads { + let group_reads_slot = + self.group_reads.entry(read.clone()).or_default(); + if let Some(idx) = group_reads_slot.iter().position(|el| el == group_id) + { + group_reads_slot.swap_remove(idx); + } + } + for write in &group_data.writes { + let group_writes_slot = + self.group_writes.entry(write.clone()).or_default(); + if let Some(idx) = + group_writes_slot.iter().position(|el| el == group_id) + { + group_writes_slot.swap_remove(idx); + } + } + } + + let group_id = self.group_counter; + self.group_counter += 1; + let mut group_data = GroupData { + orders: vec![order], + reads: used_state.read_slot_values.keys().cloned().collect(), + writes: used_state.written_slot_values.keys().cloned().collect(), + }; + for (_, mut group) in conflicting_groups { + group_data.orders.append(&mut group.orders); + group_data.reads.append(&mut group.reads); + group_data.writes.append(&mut group.writes); + } + group_data.reads.sort(); + group_data.reads.dedup(); + group_data.writes.sort(); + group_data.writes.dedup(); + for read in &group_data.reads { + self.group_reads + .entry(read.clone()) + .or_default() + .push(group_id); + } + for write in &group_data.writes { + self.group_writes + .entry(write.clone()) + .or_default() + .push(group_id); + } + self.groups.insert(group_id, group_data); + } + } + } + } + + pub fn get_order_groups(&self) -> Vec { + let groups = self + .groups + .iter() + .sorted_by_key(|(idx, _)| *idx) + .map(|(_, group_data)| { + if group_data.orders.len() == 1 { + let order_profit = group_data.orders[0].sim_value.coinbase_profit; + OrderGroup { + orders: Arc::new(group_data.orders.clone()), + best_ordering: Arc::new(Mutex::new(GroupOrdering { + total_profit: order_profit, + orders: vec![(0, order_profit)], + })), + } + } else { + OrderGroup { + orders: Arc::new(group_data.orders.clone()), + best_ordering: Arc::new(Mutex::new(GroupOrdering { + total_profit: U256::ZERO, + orders: Vec::new(), + })), + } + } + }) + .collect::>(); + groups + } +} + +impl Default for CachedGroups { + fn default() -> Self { + Self::new() + } +} + +pub fn split_orders_into_groups(orders: Vec) -> Vec { + let mut cached_groups = CachedGroups::new(); + cached_groups.add_orders(orders); + cached_groups.get_order_groups() +} + +#[cfg(test)] +mod tests { + use alloy_primitives::{Address, TxHash, B256, U256}; + use reth::primitives::{ + Transaction, TransactionSigned, TransactionSignedEcRecovered, TxLegacy, + }; + + use crate::{ + building::evm_inspector::{SlotKey, UsedStateTrace}, + primitives::{ + MempoolTx, Order, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs, + }, + }; + + use super::CachedGroups; + + struct DataGenerator { + last_used_id: u64, + } + impl DataGenerator { + pub fn new() -> DataGenerator { + DataGenerator { last_used_id: 0 } + } + + pub fn create_u64(&mut self) -> u64 { + self.last_used_id += 1; + self.last_used_id + } + + pub fn create_u256(&mut self) -> U256 { + U256::from(self.create_u64()) + } + + pub fn create_b256(&mut self) -> B256 { + B256::from(self.create_u256()) + } + + pub fn create_hash(&mut self) -> TxHash { + TxHash::from(self.create_u256()) + } + + pub fn create_slot(&mut self) -> SlotKey { + SlotKey { + address: Address::ZERO, + key: self.create_b256(), + } + } + + pub fn create_tx(&mut self) -> TransactionSignedEcRecovered { + TransactionSignedEcRecovered::from_signed_transaction( + TransactionSigned { + hash: self.create_hash(), + transaction: Transaction::Legacy(TxLegacy::default()), + ..Default::default() + }, + Address::default(), + ) + } + + pub fn create_order( + &mut self, + read: Option<&SlotKey>, + write: Option<&SlotKey>, + ) -> SimulatedOrder { + let mut trace = UsedStateTrace::default(); + if let Some(read) = read { + trace + .read_slot_values + .insert(read.clone(), self.create_b256()); + } + if let Some(write) = write { + trace + .written_slot_values + .insert(write.clone(), self.create_b256()); + } + + SimulatedOrder { + order: Order::Tx(MempoolTx { + tx_with_blobs: TransactionSignedEcRecoveredWithBlobs::new_no_blobs( + self.create_tx(), + ) + .unwrap(), + }), + used_state_trace: Some(trace), + sim_value: SimValue::default(), + prev_order: None, + } + } + } + + #[test] + fn two_writes_single_read() { + let mut data_gen = DataGenerator::new(); + let slot = data_gen.create_slot(); + let oa = data_gen.create_order(None, Some(&slot)); + let ob = data_gen.create_order(None, Some(&slot)); + let oc = data_gen.create_order(Some(&slot), None); + let mut cached_groups = CachedGroups::new(); + cached_groups.add_orders(vec![oa, ob, oc]); + let groups = cached_groups.get_order_groups(); + assert_eq!(groups.len(), 1); + } + + #[test] + fn two_reads() { + let mut data_gen = DataGenerator::new(); + let slot = data_gen.create_slot(); + let oa = data_gen.create_order(Some(&slot), None); + let ob = data_gen.create_order(Some(&slot), None); + let mut cached_groups = CachedGroups::new(); + cached_groups.add_orders(vec![oa, ob]); + let groups = cached_groups.get_order_groups(); + assert_eq!(groups.len(), 2); + } + + #[test] + fn two_writes() { + let mut data_gen = DataGenerator::new(); + let slot = data_gen.create_slot(); + let oa = data_gen.create_order(None, Some(&slot)); + let ob = data_gen.create_order(None, Some(&slot)); + let mut cached_groups = CachedGroups::new(); + cached_groups.add_orders(vec![oa, ob]); + let groups = cached_groups.get_order_groups(); + assert_eq!(groups.len(), 2); + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/merger.rs b/crates/rbuilder/src/building/builders/merging_builder/merger.rs new file mode 100644 index 00000000..91189388 --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/merger.rs @@ -0,0 +1,211 @@ +use std::sync::Arc; + +use crate::building::{BlockBuildingContext, BlockState, PartialBlock}; +use ahash::HashMap; +use alloy_primitives::{Address, U256}; +use itertools::Itertools; +use rand::{seq::SliceRandom, SeedableRng}; +use reth::providers::{ProviderFactory, StateProvider}; +use reth_db::database::Database; +use reth_payload_builder::database::CachedReads; + +use tokio_util::sync::CancellationToken; + +use super::OrderGroup; + +/// MergeTask describes some ordering that should be tried for the given group. +#[derive(Debug, Clone)] +pub struct MergeTask { + pub group_idx: usize, + pub command: MergeTaskCommand, +} + +#[derive(Debug, Clone)] +pub enum MergeTaskCommand { + /// `StaticOrdering` checks the following ordrerings: map profit first / last, mev gas price first / last + StaticOrdering { + /// if false reverse gas price and reverse profit orderings are skipped + extra_orderings: bool, + }, + /// `AllPermutations` checks all possible permutations of the group. + AllPermutations, + /// `RandomPermutations` checks random permutations of the group. + RandomPermutations { seed: u64, count: usize }, +} + +#[derive(Debug)] +pub struct MergingContext { + pub provider_factory: ProviderFactory, + pub ctx: BlockBuildingContext, + pub groups: Vec, + pub cancellation_token: CancellationToken, + pub cache: Option, +} + +impl MergingContext { + pub fn new( + provider_factory: ProviderFactory, + ctx: BlockBuildingContext, + groups: Vec, + cancellation_token: CancellationToken, + cache: Option, + ) -> Self { + MergingContext { + provider_factory, + ctx, + groups, + cancellation_token, + cache, + } + } + + pub fn run_merging_task(&mut self, task: MergeTask) -> eyre::Result<()> { + let state_provider = self + .provider_factory + .history_by_block_hash(self.ctx.attributes.parent)?; + let state_provider: Arc = Arc::from(state_provider); + + let orderings_to_try = match task.command { + MergeTaskCommand::StaticOrdering { extra_orderings } => { + // order by / reverse gas price + // order by / reverse max profit + + let mut orderings = vec![]; + let orders = &self.groups[task.group_idx]; + { + let mut ids_and_value = orders + .orders + .iter() + .enumerate() + .map(|(idx, o)| (idx, o.sim_value.mev_gas_price)) + .collect::>(); + + if extra_orderings { + ids_and_value.sort_by(|a, b| a.1.cmp(&b.1)); + orderings.push( + ids_and_value + .iter() + .map(|(idx, _)| *idx) + .collect::>(), + ); + } + ids_and_value.sort_by(|a, b| a.1.cmp(&b.1).reverse()); + orderings.push( + ids_and_value + .iter() + .map(|(idx, _)| *idx) + .collect::>(), + ); + + let mut ids_and_value = orders + .orders + .iter() + .enumerate() + .map(|(idx, o)| (idx, o.sim_value.coinbase_profit)) + .collect::>(); + + if extra_orderings { + ids_and_value.sort_by(|a, b| a.1.cmp(&b.1)); + orderings.push( + ids_and_value + .iter() + .map(|(idx, _)| *idx) + .collect::>(), + ); + } + + ids_and_value.sort_by(|a, b| a.1.cmp(&b.1).reverse()); + orderings.push( + ids_and_value + .iter() + .map(|(idx, _)| *idx) + .collect::>(), + ); + } + orderings + } + MergeTaskCommand::AllPermutations => { + let orders = &self.groups[task.group_idx]; + let orderings = (0..orders.orders.len()).collect::>(); + orderings + .into_iter() + .permutations(orders.orders.len()) + .collect() + } + MergeTaskCommand::RandomPermutations { seed, count } => { + let mut orderings = vec![]; + + let orders = &self.groups[task.group_idx]; + let mut indexes = (0..orders.orders.len()).collect::>(); + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + for _ in 0..count { + indexes.shuffle(&mut rng); + orderings.push(indexes.clone()); + } + + orderings + } + }; + + for ordering in orderings_to_try { + let mut partial_block = PartialBlock::new(true, None); + let mut state = BlockState::new_arc(state_provider.clone()) + .with_cached_reads(self.cache.take().unwrap_or_default()); + partial_block.pre_block_call(&self.ctx, &mut state)?; + + let mut ordering = ordering; + ordering.reverse(); // we will use it as a queue: pop() and push() + let mut result_ordering = Vec::new(); + let mut total_profit = U256::ZERO; + let mut pending_orders: HashMap<(Address, u64), usize> = HashMap::default(); + loop { + if self.cancellation_token.is_cancelled() { + let (cached_reads, _, _) = state.into_parts(); + self.cache = Some(cached_reads); + return Ok(()); + } + + let order_idx = if let Some(order_idx) = ordering.pop() { + order_idx + } else { + break; + }; + let sim_order = &self.groups[task.group_idx].orders[order_idx]; + let commit_result = partial_block.commit_order(sim_order, &self.ctx, &mut state)?; + match commit_result { + Ok(res) => { + for (address, nonce) in res.nonces_updated { + if let Some(pending_order) = pending_orders.remove(&(address, nonce)) { + ordering.push(pending_order); + } + } + total_profit += res.coinbase_profit; + result_ordering.push((order_idx, res.coinbase_profit)); + } + Err(err) => { + if let Some((address, nonce)) = + err.try_get_tx_too_high_error(&sim_order.order) + { + pending_orders.insert((address, nonce), order_idx); + } + } + } + } + + let mut best_ordering = self.groups[task.group_idx].best_ordering.lock().unwrap(); + if best_ordering.total_profit < total_profit { + best_ordering.total_profit = total_profit; + best_ordering.orders = result_ordering; + } + + let (new_cached_reads, _, _) = state.into_parts(); + self.cache = Some(new_cached_reads); + } + + Ok(()) + } + + pub fn into_cached_reads(self) -> CachedReads { + self.cache.unwrap_or_default() + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs b/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs new file mode 100644 index 00000000..3e8ac9a7 --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs @@ -0,0 +1,130 @@ +use crate::building::BlockBuildingContext; +use reth::providers::ProviderFactory; +use reth_db::database::Database; +use reth_payload_builder::database::CachedReads; +use std::{sync::Arc, time::Instant}; +use tokio_util::sync::CancellationToken; +use tracing::{trace, warn}; + +use super::{ + merger::{MergeTask, MergeTaskCommand, MergingContext}, + OrderGroup, +}; + +/// `MergingPool` is a set of threads that try ordering for the given groups of orders. +pub struct MergingPool { + provider_factory: ProviderFactory, + ctx: BlockBuildingContext, + num_threads: usize, + global_cancellation_token: CancellationToken, + cached_reads: Vec, + + current_task_cancellaton_token: CancellationToken, + thread_handles: Vec>, +} + +impl MergingPool { + pub fn new( + provider_factory: ProviderFactory, + ctx: BlockBuildingContext, + num_threads: usize, + global_cancellation_token: CancellationToken, + ) -> Self { + Self { + provider_factory, + ctx, + num_threads, + global_cancellation_token, + cached_reads: Vec::new(), + current_task_cancellaton_token: CancellationToken::new(), + thread_handles: Vec::new(), + } + } + + pub fn start_merging_tasks(&mut self, groups: Vec) -> eyre::Result<()> { + self.current_task_cancellaton_token = self.global_cancellation_token.child_token(); + + let queue = Arc::new(crossbeam_queue::ArrayQueue::new(10_000)); + for (group_idx, group) in groups.iter().enumerate() { + if group.orders.len() == 1 { + continue; + } else if group.orders.len() <= 3 { + let _ = queue.push(MergeTask { + group_idx, + command: MergeTaskCommand::AllPermutations, + }); + } else { + let _ = queue.push(MergeTask { + group_idx, + command: MergeTaskCommand::StaticOrdering { + extra_orderings: true, + }, + }); + } + } + // Here we fill queue of tasks with random ordering tasks for big groups. + for i in 0..150 { + for (group_idx, group) in groups.iter().enumerate() { + if group.orders.len() > 3 { + let _ = queue.push(MergeTask { + group_idx, + command: MergeTaskCommand::RandomPermutations { + seed: group_idx as u64 + i, + count: 2, + }, + }); + } + } + } + + for idx in 0..self.num_threads { + let mut merging_context = MergingContext::new( + self.provider_factory.clone(), + self.ctx.clone(), + groups.clone(), + self.current_task_cancellaton_token.clone(), + self.cached_reads.pop(), + ); + + let cancellation = self.current_task_cancellaton_token.clone(); + let queue = queue.clone(); + let handle = std::thread::Builder::new() + .name(format!("merging-thread-{}", idx)) + .spawn(move || { + while let Some(task) = queue.pop() { + if cancellation.is_cancelled() { + break; + } + + let start = Instant::now(); + if let Err(err) = merging_context.run_merging_task(task.clone()) { + warn!("Error running merging task: {:?}", err); + } + trace!( + "Finished merging task: {:?}, elapsed: {:?}, len: {}", + task, + start.elapsed(), + merging_context.groups[task.group_idx].orders.len(), + ); + } + + merging_context.into_cached_reads() + })?; + + self.thread_handles.push(handle); + } + + Ok(()) + } + + pub fn stop_merging_threads(&mut self) -> eyre::Result<()> { + self.current_task_cancellaton_token.cancel(); + for join_handle in self.thread_handles.drain(..) { + let reads = join_handle + .join() + .map_err(|_err| eyre::eyre!("Error joining merging thread"))?; + self.cached_reads.push(reads); + } + Ok(()) + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/mod.rs b/crates/rbuilder/src/building/builders/merging_builder/mod.rs new file mode 100644 index 00000000..f0df75f2 --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/mod.rs @@ -0,0 +1,284 @@ +pub mod combinator; +pub mod groups; +pub mod merger; +pub mod merging_pool; +pub mod order_intake_store; +use combinator::CombinatorContext; +pub use groups::*; +use merger::{MergeTask, MergeTaskCommand, MergingContext}; +use merging_pool::MergingPool; +use tracing::{error, trace}; + +use self::order_intake_store::OrderIntakeStore; + +use crate::{ + building::builders::{ + handle_building_error, BacktestSimulateBlockInput, Block, BlockBuildingAlgorithm, + BlockBuildingAlgorithmInput, LiveBuilderInput, + }, + roothash::RootHashConfig, +}; +use alloy_primitives::{utils::format_ether, Address}; +use reth_db::database::Database; + +use reth::tasks::pool::BlockingTaskPool; +use reth_payload_builder::database::CachedReads; +use serde::Deserialize; +use std::time::{Duration, Instant}; +use time::OffsetDateTime; +use tokio_util::sync::CancellationToken; + +/// MergingBuilderConfig configures merging builder. +/// * `num_threads` - number of threads to use for merging. +/// * `merge_wait_time_ms` - time to wait for merging to finish before consuming new orders. +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct MergingBuilderConfig { + pub discard_txs: bool, + pub num_threads: usize, + pub merge_wait_time_ms: u64, + #[serde(default)] + pub coinbase_payment: bool, +} + +impl MergingBuilderConfig { + fn merge_wait_time(&self) -> Duration { + Duration::from_millis(self.merge_wait_time_ms) + } +} + +pub fn run_merging_builder( + input: LiveBuilderInput, + config: &MergingBuilderConfig, +) { + let block_number = input.ctx.block_env.number.to::(); + let mut order_intake_consumer = + OrderIntakeStore::new(input.input, &input.sbundle_mergeabe_signers); + + let mut merging_combinator = CombinatorContext::new( + input.provider_factory.clone(), + input.root_hash_task_pool, + input.ctx.clone(), + vec![], + input.cancel.clone(), + None, + config.discard_txs, + config.coinbase_payment, + input.root_hash_config, + input.builder_name, + ); + + let mut merging_pool = MergingPool::new( + input.provider_factory.clone(), + input.ctx.clone(), + config.num_threads, + input.cancel.clone(), + ); + + let mut cached_groups = CachedGroups::new(); + + 'building: loop { + if input.cancel.is_cancelled() { + break 'building; + } + + match order_intake_consumer.consume_next_batch() { + Ok(ok) => { + if !ok { + break 'building; + } + } + Err(err) => { + error!(?err, "Error consuming next order batch"); + continue; + } + } + + let orders_closed_at = OffsetDateTime::now_utc(); + + let new_orders = order_intake_consumer.drain_new_orders(); + // We can update cached_groups if we have ONLY adds + if let Some(new_orders) = new_orders { + cached_groups.add_orders(new_orders); + } else { + cached_groups = CachedGroups::new(); + cached_groups.add_orders(order_intake_consumer.get_orders()); + } + + let groups = cached_groups.get_order_groups(); + + let group_count = groups.len(); + let order_count = groups.iter().map(|g| g.orders.len()).sum::(); + merging_combinator.set_groups(groups.clone()); + + let start = Instant::now(); + if let Err(error) = merging_pool.stop_merging_threads() { + error!( + ?error, + block_number, "Error stopping merging threads, cancelling slot, building block", + ); + break 'building; + } + trace!( + time_ms = start.elapsed().as_millis(), + "Stopped merging threads" + ); + if let Err(error) = merging_pool.start_merging_tasks(groups) { + error!( + ?error, + block_number, "Error starting merging tasks, cancelling slot, building block", + ); + break 'building; + } + + let merging_start_time = Instant::now(); + trace!("Starting new merging batch"); + 'merging: loop { + if input.cancel.is_cancelled() { + break 'building; + } + if merging_start_time.elapsed() > config.merge_wait_time() { + break 'merging; + } + + match merging_combinator.combine_best_groups_mergings( + orders_closed_at, + input.sink.can_use_suggested_fee_recipient_as_coinbase(), + ) { + Ok(block) => { + trace!( + group_count, + order_count, + bid_value = format_ether(block.built_block_trace().bid_value), + "Merger combined best group orderings" + ); + input.sink.new_block(block); + } + Err(err) => { + if !handle_building_error(err) { + break 'building; + } + } + } + } + } +} + +pub fn merging_build_backtest( + input: BacktestSimulateBlockInput<'_, DB>, + config: MergingBuilderConfig, +) -> eyre::Result<(Block, CachedReads)> { + let sorted_orders = { + let mut orders = input.sim_orders.clone(); + orders.sort_by_key(|o| o.order.id()); + orders + }; + + let groups = split_orders_into_groups(sorted_orders); + + let mut merging_context = MergingContext::new( + input.provider_factory.clone(), + input.ctx.clone(), + groups.clone(), + CancellationToken::new(), + input.cached_reads, + ); + + for (group_idx, group) in groups.iter().enumerate() { + if group.orders.len() == 1 { + continue; + } else if group.orders.len() <= 3 { + merging_context.run_merging_task(MergeTask { + group_idx, + command: MergeTaskCommand::AllPermutations, + })?; + } else { + merging_context.run_merging_task(MergeTask { + group_idx, + command: MergeTaskCommand::StaticOrdering { + extra_orderings: true, + }, + })?; + merging_context.run_merging_task(MergeTask { + group_idx, + command: MergeTaskCommand::RandomPermutations { seed: 0, count: 10 }, + })?; + } + } + + let cache_reads = merging_context.into_cached_reads(); + + let mut combinator_context = CombinatorContext::new( + input.provider_factory, + BlockingTaskPool::build()?, + input.ctx.clone(), + groups.clone(), + CancellationToken::new(), + Some(cache_reads), + config.discard_txs, + config.coinbase_payment, + RootHashConfig::skip_root_hash(), + input.builder_name, + ); + + let block_builder = combinator_context + .combine_best_groups_mergings(OffsetDateTime::now_utc(), config.coinbase_payment)?; + let payout_tx_value = if config.coinbase_payment { + None + } else { + Some(block_builder.true_block_value()?) + }; + let finalize_block_result = block_builder.finalize_block(payout_tx_value)?; + Ok(( + finalize_block_result.block, + finalize_block_result.cached_reads, + )) +} + +#[derive(Debug)] +pub struct MergingBuildingAlgorithm { + root_hash_config: RootHashConfig, + root_hash_task_pool: BlockingTaskPool, + sbundle_mergeabe_signers: Vec
, + config: MergingBuilderConfig, + name: String, +} + +impl MergingBuildingAlgorithm { + pub fn new( + root_hash_config: RootHashConfig, + root_hash_task_pool: BlockingTaskPool, + sbundle_mergeabe_signers: Vec
, + config: MergingBuilderConfig, + name: String, + ) -> Self { + Self { + root_hash_config, + root_hash_task_pool, + sbundle_mergeabe_signers, + config, + name, + } + } +} + +impl BlockBuildingAlgorithm for MergingBuildingAlgorithm { + fn name(&self) -> String { + self.name.clone() + } + + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { + let live_input = LiveBuilderInput { + provider_factory: input.provider_factory, + root_hash_config: self.root_hash_config.clone(), + root_hash_task_pool: self.root_hash_task_pool.clone(), + ctx: input.ctx.clone(), + input: input.input, + sink: input.sink, + builder_name: self.name.clone(), + cancel: input.cancel, + sbundle_mergeabe_signers: self.sbundle_mergeabe_signers.clone(), + }; + run_merging_builder(live_input, &self.config); + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs b/crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs new file mode 100644 index 00000000..f0972eb9 --- /dev/null +++ b/crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs @@ -0,0 +1,61 @@ +use std::{cell::RefCell, rc::Rc}; + +use alloy_primitives::Address; +use tokio::sync::broadcast; + +use crate::{ + building::{ + builders::OrderConsumer, multi_share_bundle_merger::MultiShareBundleMerger, + SimulatedOrderStore, + }, + live_builder::simulation::SimulatedOrderCommand, + primitives::SimulatedOrder, +}; + +/// Struct that allow getting the new orders from the order/cancellation stream in the way the merging builder likes it. +/// Contains the current whole set of orders but also can be queried for deltas on the orders ONLY if the deltas are all additions +/// Chains MultiShareBundleMerger->SimulatedOrderStore +/// Usage: +/// call consume_next_batch to poll the source and internally store the new orders +/// call drain_new_orders/get_orders +pub struct OrderIntakeStore { + order_consumer: OrderConsumer, + share_bundle_merger: Box>, + order_sink: Rc>, +} + +impl OrderIntakeStore { + pub fn new( + orders_input_stream: broadcast::Receiver, + sbundle_merger_selected_signers: &[Address], + ) -> Self { + let order_sink = Rc::new(RefCell::new(SimulatedOrderStore::new())); + let share_bundle_merger = Box::new(MultiShareBundleMerger::new( + sbundle_merger_selected_signers, + order_sink.clone(), + )); + + Self { + order_consumer: OrderConsumer::new(orders_input_stream), + share_bundle_merger, + order_sink, + } + } + + pub fn consume_next_batch(&mut self) -> eyre::Result { + self.order_consumer.consume_next_commands()?; + let input: &mut MultiShareBundleMerger = &mut self.share_bundle_merger; + self.order_consumer.apply_new_commands(input); + Ok(true) + } + + /// returns the new orders since last call if we ONLY had new orders (no cancellations allowed) + pub fn drain_new_orders(&mut self) -> Option> { + (*self.order_sink).borrow_mut().drain_new_orders() + } + + /// All the current orders + pub fn get_orders(&self) -> Vec { + self.order_sink.borrow().get_orders() + } +} diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index 9d5b1cfb..f81a7d1e 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -1,5 +1,6 @@ //! builders is a subprocess that builds a block pub mod block_building_helper; +pub mod merging_builder; pub mod mock_block_building_helper; pub mod ordering_builder; diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 36eac698..8c7ad59a 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -18,6 +18,9 @@ use crate::{ beacon_api_client::Client, building::{ builders::{ + merging_builder::{ + merging_build_backtest, MergingBuilderConfig, MergingBuildingAlgorithm, + }, ordering_builder::{OrderingBuilderConfig, OrderingBuildingAlgorithm}, BacktestSimulateBlockInput, Block, BlockBuildingAlgorithm, }, @@ -68,11 +71,11 @@ pub const WALLET_INIT_HISTORY_SIZE: Duration = Duration::from_secs(60 * 60 * 24) /// 1 is easier for debugging. pub const DEFAULT_MAX_CONCURRENT_SEALS: u64 = 1; -/// This example has a single building algorithm cfg but the idea of this enum is to have several builders #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(tag = "algo", rename_all = "kebab-case", deny_unknown_fields)] pub enum SpecificBuilderConfig { OrderingBuilder(OrderingBuilderConfig), + MergingBuilder(MergingBuilderConfig), } #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] @@ -362,6 +365,7 @@ impl LiveBuilderConfig for Config { SpecificBuilderConfig::OrderingBuilder(config) => { crate::building::builders::ordering_builder::backtest_simulate_block(config, input) } + SpecificBuilderConfig::MergingBuilder(config) => merging_build_backtest(input, config), } } } @@ -466,7 +470,7 @@ pub fn coinbase_signer_from_secret_key(secret_key: &str) -> eyre::Result Ok(Signer::try_from_secret(secret_key)?) } -fn create_builders( +pub fn create_builders( configs: Vec, root_hash_config: RootHashConfig, root_hash_task_pool: BlockingTaskPool, @@ -501,6 +505,15 @@ fn create_builder( cfg.name, )) } + SpecificBuilderConfig::MergingBuilder(merge_cfg) => { + Arc::new(MergingBuildingAlgorithm::new( + root_hash_config.clone(), + root_hash_task_pool.clone(), + sbundle_mergeabe_signers.to_vec(), + merge_cfg, + cfg.name, + )) + } } }