From 55d4ef77ff908ee92a1c2d322c02ffcf2a623797 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Thu, 22 Aug 2024 13:59:00 +0100 Subject: [PATCH 1/7] Make the block builder pool a block building algorithm --- crates/rbuilder/src/bin/dummy-builder.rs | 2 +- .../rbuilder/src/live_builder/base_config.rs | 2 +- .../rbuilder/src/live_builder/building/mod.rs | 132 ++++-------------- crates/rbuilder/src/live_builder/config.rs | 4 +- crates/rbuilder/src/live_builder/mod.rs | 90 ++++++++++-- 5 files changed, 106 insertions(+), 124 deletions(-) diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 71cd5f6c..1079dda7 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -98,7 +98,7 @@ async fn main() -> eyre::Result<()> { global_cancellation: cancel.clone(), extra_rpc: RpcModule::new(()), sink_factory: Box::new(TraceBlockSinkFactory {}), - builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], + builder: Some(Arc::new(DummyBuildingAlgorithm::new(10))), }; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 52711831..024abe2d 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -193,7 +193,7 @@ impl BaseConfig { extra_rpc: RpcModule::new(()), sink_factory, - builders: Vec::new(), + builder: None, }) } diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index 222e589e..7ead0138 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -1,139 +1,57 @@ -use std::{sync::Arc, time::Duration}; - -use crate::{ - building::{ - builders::{ - BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory, - }, - BlockBuildingContext, - }, - live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults}, - utils::ProviderFactoryReopener, -}; +use crate::building::builders::{BlockBuildingAlgorithm, BlockBuildingAlgorithmInput}; use reth_db::database::Database; -use tokio::sync::{broadcast, mpsc}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, trace}; - -use super::{ - order_input::{ - self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, - }, - payload_events, - simulation::OrderSimulationPool, -}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tracing::{debug, trace}; #[derive(Debug)] pub struct BlockBuildingPool { - provider_factory: ProviderFactoryReopener, builders: Vec>>, - sink_factory: Box, - orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, } impl BlockBuildingPool { - pub fn new( - provider_factory: ProviderFactoryReopener, - builders: Vec>>, - sink_factory: Box, - orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, - ) -> Self { - BlockBuildingPool { - provider_factory, - builders, - sink_factory, - orderpool_subscriber, - order_simulation_pool, - } + pub fn new(builders: Vec>>) -> Self { + BlockBuildingPool { builders } } +} - /// Connects OrdersForBlock->OrderReplacementManager->Simulations and calls start_building_job - pub fn start_block_building( - &mut self, - payload: payload_events::MevBoostSlotData, - block_ctx: BlockBuildingContext, - global_cancellation: CancellationToken, - max_time_to_build: Duration, - ) { - let block_cancellation = global_cancellation.child_token(); - - let cancel = block_cancellation.clone(); - tokio::spawn(async move { - tokio::time::sleep(max_time_to_build).await; - cancel.cancel(); - }); - - let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); - // add OrderReplacementManager to manage replacements and cancellations - let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); - // sink removal is automatic via OrderSink::is_alive false - let _block_sub = self.orderpool_subscriber.add_sink( - block_ctx.block_env.number.to(), - Box::new(order_replacement_manager), - ); - - let simulations_for_block = self.order_simulation_pool.spawn_simulation_job( - block_ctx.clone(), - orders_for_block, - block_cancellation.clone(), - ); - self.start_building_job( - block_ctx, - payload, - simulations_for_block, - block_cancellation, - ); +impl BlockBuildingAlgorithm + for BlockBuildingPool +{ + fn name(&self) -> String { + "BlockBuildingPool".to_string() } - /// Per each BlockBuildingAlgorithm creates BlockBuildingAlgorithmInput and Sinks and spawn a task to run it - fn start_building_job( - &mut self, - ctx: BlockBuildingContext, - slot_data: MevBoostSlotData, - input: SlotOrderSimResults, - cancel: CancellationToken, - ) { - let builder_sink = self.sink_factory.create_sink(slot_data, cancel.clone()); + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { let (broadcast_input, _) = broadcast::channel(10_000); - - let block_number = ctx.block_env.number.to::(); - let provider_factory = match self - .provider_factory - .check_consistency_and_reopen_if_needed(block_number) - { - Ok(provider_factory) => provider_factory, - Err(err) => { - error!(?err, "Error while reopening provider factory"); - return; - } - }; + let block_number = input.ctx.block_env.number.to::(); for builder in self.builders.iter() { let builder_name = builder.name(); debug!(block = block_number, builder_name, "Spawning builder job"); - let input = BlockBuildingAlgorithmInput:: { - provider_factory: provider_factory.clone(), - ctx: ctx.clone(), + + let builder = builder.clone(); + let input = BlockBuildingAlgorithmInput { + provider_factory: input.provider_factory.clone(), + ctx: input.ctx.clone(), input: broadcast_input.subscribe(), - sink: builder_sink.clone(), - cancel: cancel.clone(), + sink: input.sink.clone(), + cancel: input.cancel.clone(), }; - let builder = builder.clone(); + tokio::task::spawn_blocking(move || { builder.build_blocks(input); debug!(block = block_number, builder_name, "Stopped builder job"); }); } - tokio::spawn(multiplex_job(input.orders, broadcast_input)); + tokio::spawn(multiplex_job(input.input, broadcast_input)); } } -async fn multiplex_job(mut input: mpsc::Receiver, sender: broadcast::Sender) { +async fn multiplex_job(mut input: broadcast::Receiver, sender: broadcast::Sender) { // we don't worry about waiting for input forever because it will be closed by producer job - while let Some(input) = input.recv().await { + while let Ok(input) = input.recv().await { // we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers if sender.send(input).is_err() { return; diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index c053c71e..0c9b985d 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -27,6 +27,7 @@ use crate::{ }, cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, + BlockBuildingPool, }, mev_boost::BLSBlockSigner, primitives::mev_boost::{MevBoostRelay, RelayConfig}, @@ -306,7 +307,8 @@ impl LiveBuilderConfig for Config { root_hash_task_pool, self.base_config.sbundle_mergeabe_signers(), ); - Ok(live_builder.with_builders(builders)) + let builder = BlockBuildingPool::new(builders); + Ok(live_builder.with_builder(Arc::new(builder))) } fn version_for_telemetry(&self) -> crate::utils::build_info::Version { diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 2f3fae4c..4ffc3cfa 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -10,11 +10,17 @@ mod watchdog; use crate::{ building::{ - builders::{BlockBuildingAlgorithm, UnfinishedBlockBuildingSinkFactory}, + builders::{ + BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory, + }, BlockBuildingContext, }, live_builder::{ - order_input::{start_orderpool_jobs, OrderInputConfig}, + building::BlockBuildingPool, + order_input::{ + order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, + start_orderpool_jobs, OrderInputConfig, + }, simulation::OrderSimulationPool, watchdog::spawn_watchdog_thread, }, @@ -23,7 +29,6 @@ use crate::{ }; use ahash::HashSet; use alloy_primitives::{Address, B256}; -use building::BlockBuildingPool; use eyre::Context; use jsonrpsee::RpcModule; use payload_events::MevBoostSlotData; @@ -35,9 +40,12 @@ use reth_chainspec::ChainSpec; use reth_db::database::Database; use std::{cmp::min, path::PathBuf, sync::Arc, time::Duration}; use time::OffsetDateTime; -use tokio::{sync::mpsc, task::spawn_blocking}; +use tokio::{ + sync::{broadcast, mpsc}, + task::spawn_blocking, +}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; /// Time the proposer have to propose a block from the beginning of the slot (https://www.paradigm.xyz/2023/04/mev-boost-ethereum-consensus Slot anatomy) const SLOT_PROPOSAL_DURATION: std::time::Duration = Duration::from_secs(4); @@ -74,7 +82,7 @@ pub struct LiveBuilder { pub global_cancellation: CancellationToken, pub sink_factory: Box, - pub builders: Vec>>, + pub builder: Option>>, // doing the Option because there is a fuunction that creates the live_builder without the builder. pub extra_rpc: RpcModule<()>, } @@ -85,8 +93,11 @@ impl Self { extra_rpc, ..self } } - pub fn with_builders(self, builders: Vec>>) -> Self { - Self { builders, ..self } + pub fn with_builder(self, builder: Arc>) -> Self { + Self { + builder: Some(builder), + ..self + } } pub async fn run(self) -> eyre::Result<()> { @@ -123,6 +134,7 @@ impl ) }; + /* let mut builder_pool = BlockBuildingPool::new( self.provider_factory.clone(), self.builders, @@ -130,8 +142,10 @@ impl orderpool_subscriber, order_simulation_pool, ); + */ let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?; + let mut sink_factory = self.sink_factory; while let Some(payload) = payload_events_channel.recv().await { if self.blocklist.contains(&payload.fee_recipient()) { @@ -215,12 +229,49 @@ impl None, ); - builder_pool.start_block_building( - payload, - block_ctx, - self.global_cancellation.clone(), - time_until_slot_end.try_into().unwrap_or_default(), - ); + // This was done before in block building pool + { + let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); + let block_cancellation = self.global_cancellation.clone().child_token(); + + let cancel = block_cancellation.clone(); + tokio::spawn(async move { + tokio::time::sleep(max_time_to_build).await; + cancel.cancel(); + }); + + let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); + // add OrderReplacementManager to manage replacements and cancellations + let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); + // sink removal is automatic via OrderSink::is_alive false + let _block_sub = orderpool_subscriber.add_sink( + block_ctx.block_env.number.to(), + Box::new(order_replacement_manager), + ); + + let simulations_for_block = order_simulation_pool.spawn_simulation_job( + block_ctx.clone(), + orders_for_block, + block_cancellation.clone(), + ); + + let (broadcast_input, _) = broadcast::channel(10_000); + let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone()); + + let input = BlockBuildingAlgorithmInput:: { + provider_factory: self.provider_factory.provider_factory_unchecked(), + ctx: block_ctx, + sink: builder_sink, + input: broadcast_input.subscribe(), + cancel: block_cancellation, + }; + + tokio::spawn(multiplex_job(simulations_for_block.orders, broadcast_input)); + + if let Some(builder) = self.builder.as_ref() { + builder.build_blocks(input); + } + } watchdog_sender.try_send(()).unwrap_or_default(); } @@ -237,6 +288,17 @@ impl } } +async fn multiplex_job(mut input: mpsc::Receiver, sender: broadcast::Sender) { + // we don't worry about waiting for input forever because it will be closed by producer job + while let Some(input) = input.recv().await { + // we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers + if sender.send(input).is_err() { + return; + } + } + trace!("Cancelling multiplex job"); +} + /// May fail if we wait too much (see [BLOCK_HEADER_DEAD_LINE_DELTA]) async fn wait_for_block_header( block: B256, From a4b2f46fe8f52702ef430fbd2ef89020e0e979fb Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Thu, 22 Aug 2024 15:55:27 +0100 Subject: [PATCH 2/7] Fix comment --- crates/rbuilder/src/live_builder/mod.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 4ffc3cfa..15307ac2 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -134,16 +134,6 @@ impl ) }; - /* - let mut builder_pool = BlockBuildingPool::new( - self.provider_factory.clone(), - self.builders, - self.sink_factory, - orderpool_subscriber, - order_simulation_pool, - ); - */ - let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?; let mut sink_factory = self.sink_factory; From 93566f85dbfc7f59eff6387e45ea6f165bd883a2 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 09:28:11 +0100 Subject: [PATCH 3/7] Fix Option comment --- crates/rbuilder/src/bin/dummy-builder.rs | 2 +- .../rbuilder/src/live_builder/base_config.rs | 4 ++-- crates/rbuilder/src/live_builder/mod.rs | 22 ++++++++++++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 1079dda7..0964b466 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -98,7 +98,7 @@ async fn main() -> eyre::Result<()> { global_cancellation: cancel.clone(), extra_rpc: RpcModule::new(()), sink_factory: Box::new(TraceBlockSinkFactory {}), - builder: Some(Arc::new(DummyBuildingAlgorithm::new(10))), + builder: Arc::new(DummyBuildingAlgorithm::new(10)), }; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 024abe2d..7437fe44 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -2,7 +2,7 @@ //! use crate::{ building::builders::UnfinishedBlockBuildingSinkFactory, - live_builder::{order_input::OrderInputConfig, LiveBuilder}, + live_builder::{order_input::OrderInputConfig, LiveBuilder, NullBlockBuildingAlgorithm}, telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig}, utils::{http_provider, BoxedProvider, ProviderFactoryReopener, Signer}, }; @@ -193,7 +193,7 @@ impl BaseConfig { extra_rpc: RpcModule::new(()), sink_factory, - builder: None, + builder: Arc::new(NullBlockBuildingAlgorithm {}), }) } diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 15307ac2..5fbba6d4 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -82,7 +82,7 @@ pub struct LiveBuilder { pub global_cancellation: CancellationToken, pub sink_factory: Box, - pub builder: Option>>, // doing the Option because there is a fuunction that creates the live_builder without the builder. + pub builder: Arc>, // doing the Option because there is a fuunction that creates the live_builder without the builder. pub extra_rpc: RpcModule<()>, } @@ -95,7 +95,7 @@ impl pub fn with_builder(self, builder: Arc>) -> Self { Self { - builder: Some(builder), + builder: builder, ..self } } @@ -257,10 +257,7 @@ impl }; tokio::spawn(multiplex_job(simulations_for_block.orders, broadcast_input)); - - if let Some(builder) = self.builder.as_ref() { - builder.build_blocks(input); - } + self.builder.build_blocks(input); } watchdog_sender.try_send(()).unwrap_or_default(); @@ -312,3 +309,16 @@ async fn wait_for_block_header( } Err(eyre::eyre!("Block header not found")) } + +#[derive(Debug)] +pub struct NullBlockBuildingAlgorithm {} + +impl BlockBuildingAlgorithm + for NullBlockBuildingAlgorithm +{ + fn name(&self) -> String { + "NullBlockBuildingAlgorithm".to_string() + } + + fn build_blocks(&self, _input: BlockBuildingAlgorithmInput) {} +} From d9c8e08c391d8352ecf60b38c157738b3486d9c9 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 09:32:28 +0100 Subject: [PATCH 4/7] Fix lint --- crates/rbuilder/src/live_builder/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 5fbba6d4..48986b7f 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -94,10 +94,7 @@ impl } pub fn with_builder(self, builder: Arc>) -> Self { - Self { - builder: builder, - ..self - } + Self { builder, ..self } } pub async fn run(self) -> eyre::Result<()> { From c14c9b44a518e7f522c65734821c8e68431e6a8e Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 09:54:07 +0100 Subject: [PATCH 5/7] Use broadcast --- crates/rbuilder/src/bin/debug-order-sim.rs | 9 +++---- crates/rbuilder/src/live_builder/mod.rs | 22 +++-------------- .../src/live_builder/simulation/mod.rs | 24 +++++++++++++------ .../live_builder/simulation/simulation_job.rs | 8 +++---- 4 files changed, 28 insertions(+), 35 deletions(-) diff --git a/crates/rbuilder/src/bin/debug-order-sim.rs b/crates/rbuilder/src/bin/debug-order-sim.rs index 812e6bf5..ecfe6405 100644 --- a/crates/rbuilder/src/bin/debug-order-sim.rs +++ b/crates/rbuilder/src/bin/debug-order-sim.rs @@ -133,8 +133,9 @@ pub async fn main() -> eyre::Result<()> { ); let block_cancel = CancellationToken::new(); - let mut sim_results = - sim_pool.spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone()); + let mut sim_results = sim_pool + .spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone()) + .subscribe(); loop { tokio::select! { new_slot = slots.recv() => { @@ -147,8 +148,8 @@ pub async fn main() -> eyre::Result<()> { break 'slots; } }, - sim_command = sim_results.orders.recv() => { - if let Some(sim_command) = sim_command { + sim_command = sim_results.recv() => { + if let Ok(sim_command) = sim_command { match sim_command{ SimulatedOrderCommand::Simulation(sim) => { orders_last_slot += 1; diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 48986b7f..59349037 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -40,12 +40,9 @@ use reth_chainspec::ChainSpec; use reth_db::database::Database; use std::{cmp::min, path::PathBuf, sync::Arc, time::Duration}; use time::OffsetDateTime; -use tokio::{ - sync::{broadcast, mpsc}, - task::spawn_blocking, -}; +use tokio::{sync::mpsc, task::spawn_blocking}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; /// Time the proposer have to propose a block from the beginning of the slot (https://www.paradigm.xyz/2023/04/mev-boost-ethereum-consensus Slot anatomy) const SLOT_PROPOSAL_DURATION: std::time::Duration = Duration::from_secs(4); @@ -242,18 +239,16 @@ impl block_cancellation.clone(), ); - let (broadcast_input, _) = broadcast::channel(10_000); let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone()); let input = BlockBuildingAlgorithmInput:: { provider_factory: self.provider_factory.provider_factory_unchecked(), ctx: block_ctx, sink: builder_sink, - input: broadcast_input.subscribe(), + input: simulations_for_block.subscribe(), cancel: block_cancellation, }; - tokio::spawn(multiplex_job(simulations_for_block.orders, broadcast_input)); self.builder.build_blocks(input); } @@ -272,17 +267,6 @@ impl } } -async fn multiplex_job(mut input: mpsc::Receiver, sender: broadcast::Sender) { - // we don't worry about waiting for input forever because it will be closed by producer job - while let Some(input) = input.recv().await { - // we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers - if sender.send(input).is_err() { - return; - } - } - trace!("Cancelling multiplex job"); -} - /// May fail if we wait too much (see [BLOCK_HEADER_DEAD_LINE_DELTA]) async fn wait_for_block_header( block: B256, diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 70a22904..823cc012 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -14,13 +14,22 @@ use ahash::HashMap; use reth_db::database::Database; use simulation_job::SimulationJob; use std::sync::{Arc, Mutex}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; use tokio_util::sync::CancellationToken; use tracing::{info_span, Instrument}; #[derive(Debug)] pub struct SlotOrderSimResults { - pub orders: mpsc::Receiver, + orders: broadcast::Sender, +} + +impl SlotOrderSimResults { + pub fn subscribe(&self) -> broadcast::Receiver { + self.orders.subscribe() + } } type BlockContextId = u64; @@ -105,7 +114,7 @@ impl OrderSimulationPool { input: OrdersForBlock, block_cancellation: CancellationToken, ) -> SlotOrderSimResults { - let (slot_sim_results_sender, slot_sim_results_receiver) = mpsc::channel(10_000); + let (slot_sim_results_sender, _) = broadcast::channel(10_000); let provider = self.provider_factory.provider_factory_unchecked(); @@ -113,6 +122,7 @@ impl OrderSimulationPool { let block_context: BlockContextId = gen_uid(); let span = info_span!("sim_ctx", block = ctx.block_env.number.to::(), parent = ?ctx.attributes.parent); + let task_slot_sim_sender = slot_sim_results_sender.clone(); let handle = tokio::spawn( async move { let sim_tree = SimTree::new(provider, ctx.attributes.parent); @@ -133,7 +143,7 @@ impl OrderSimulationPool { new_order_sub, sim_req_sender, sim_results_receiver, - slot_sim_results_sender, + task_slot_sim_sender, sim_tree, ); @@ -155,7 +165,7 @@ impl OrderSimulationPool { } SlotOrderSimResults { - orders: slot_sim_results_receiver, + orders: slot_sim_results_sender, } } } @@ -187,7 +197,7 @@ mod tests { new_order_sub: order_receiver, }; - let mut sim_results = sim_pool.spawn_simulation_job( + let sim_results = sim_pool.spawn_simulation_job( test_context.block_building_context().clone(), orders_for_block, cancel.clone(), @@ -205,7 +215,7 @@ mod tests { // We expect to receive the simulation giving a profit of coinbase_profit since that's what we sent directly to coinbase. // and we are not paying any priority fee - if let Some(command) = sim_results.orders.recv().await { + if let Ok(command) = sim_results.subscribe().recv().await { match command { SimulatedOrderCommand::Simulation(sim_order) => { assert_eq!( diff --git a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs index c9e6884a..81643933 100644 --- a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs +++ b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs @@ -8,7 +8,7 @@ use crate::{ use ahash::HashSet; use alloy_primitives::utils::format_ether; use reth_db::database::Database; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; @@ -34,7 +34,7 @@ pub struct SimulationJob { /// Here we receive the results we asked to sim_req_sender sim_results_receiver: mpsc::Receiver, /// Output of the simulations - slot_sim_results_sender: mpsc::Sender, + slot_sim_results_sender: broadcast::Sender, sim_tree: SimTree, orders_received: OrderCounter, @@ -51,7 +51,7 @@ impl SimulationJob { new_order_sub: mpsc::UnboundedReceiver, sim_req_sender: flume::Sender, sim_results_receiver: mpsc::Receiver, - slot_sim_results_sender: mpsc::Sender, + slot_sim_results_sender: broadcast::Sender, sim_tree: SimTree, ) -> Self { Self { @@ -176,7 +176,6 @@ impl SimulationJob { .send(SimulatedOrderCommand::Simulation( sim_result.simulated_order.clone(), )) - .await .is_err() { return false; //receiver closed :( @@ -199,7 +198,6 @@ impl SimulationJob { async fn send_cancel(&mut self, id: &OrderId) -> bool { self.slot_sim_results_sender .send(SimulatedOrderCommand::Cancellation(*id)) - .await .is_ok() } From 33307d82f2a82e3344470668d0532e58bf126f76 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 10:06:29 +0100 Subject: [PATCH 6/7] Partial --- crates/rbuilder/src/live_builder/mod.rs | 60 ++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 59349037..e19a4ac6 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -19,7 +19,7 @@ use crate::{ building::BlockBuildingPool, order_input::{ order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, - start_orderpool_jobs, OrderInputConfig, + start_orderpool_jobs, OrderInputConfig, OrderPoolSubscriber, }, simulation::OrderSimulationPool, watchdog::spawn_watchdog_thread, @@ -129,7 +129,7 @@ impl }; let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?; - let mut sink_factory = self.sink_factory; + // let mut sink_factory = self.sink_factory; while let Some(payload) = payload_events_channel.recv().await { if self.blocklist.contains(&payload.fee_recipient()) { @@ -213,6 +213,7 @@ impl None, ); + /* // This was done before in block building pool { let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); @@ -251,6 +252,16 @@ impl self.builder.build_blocks(input); } + */ + let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); + + self.build_block( + max_time_to_build, + block_ctx, + payload, + &orderpool_subscriber, + &order_simulation_pool, + ); watchdog_sender.try_send(()).unwrap_or_default(); } @@ -265,6 +276,51 @@ impl } Ok(()) } + + fn build_block( + &self, + max_time_to_build: Duration, + block_ctx: BlockBuildingContext, + payload: MevBoostSlotData, + orderpool_subscriber: &OrderPoolSubscriber, + order_simulation_pool: &OrderSimulationPool, + ) { + // let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); + let block_cancellation = self.global_cancellation.clone().child_token(); + + let cancel = block_cancellation.clone(); + tokio::spawn(async move { + tokio::time::sleep(max_time_to_build).await; + cancel.cancel(); + }); + + let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); + // add OrderReplacementManager to manage replacements and cancellations + let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); + // sink removal is automatic via OrderSink::is_alive false + let _block_sub = orderpool_subscriber.add_sink( + block_ctx.block_env.number.to(), + Box::new(order_replacement_manager), + ); + + let simulations_for_block = order_simulation_pool.spawn_simulation_job( + block_ctx.clone(), + orders_for_block, + block_cancellation.clone(), + ); + + let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone()); + + let input = BlockBuildingAlgorithmInput:: { + provider_factory: self.provider_factory.provider_factory_unchecked(), + ctx: block_ctx, + sink: builder_sink, + input: simulations_for_block.subscribe(), + cancel: block_cancellation, + }; + + self.builder.build_blocks(input); + } } /// May fail if we wait too much (see [BLOCK_HEADER_DEAD_LINE_DELTA]) From e64e6829eeb92363fae5dc5ea277a5ceed473894 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 10:45:38 +0100 Subject: [PATCH 7/7] Fix Rust stuff --- crates/rbuilder/src/live_builder/mod.rs | 63 +++++-------------- .../src/live_builder/payload_events/mod.rs | 20 +++--- 2 files changed, 25 insertions(+), 58 deletions(-) diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index e19a4ac6..aabfc3f1 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -54,7 +54,7 @@ const GET_BLOCK_HEADER_PERIOD: time::Duration = time::Duration::milliseconds(250 /// Trait used to trigger a new block building process in the slot. pub trait SlotSource { - fn recv_slot_channel(self) -> mpsc::UnboundedReceiver; + fn recv_slot_channel(&self) -> mpsc::UnboundedReceiver; } /// Main builder struct. @@ -94,25 +94,28 @@ impl Self { builder, ..self } } - pub async fn run(self) -> eyre::Result<()> { + pub async fn run(mut self) -> eyre::Result<()> { info!("Builder block list size: {}", self.blocklist.len(),); info!( "Builder coinbase address: {:?}", self.coinbase_signer.address ); - spawn_error_storage_writer(self.error_storage_path, self.global_cancellation.clone()) - .await - .with_context(|| "Error spawning error storage writer")?; + spawn_error_storage_writer( + self.error_storage_path.clone(), + self.global_cancellation.clone(), + ) + .await + .with_context(|| "Error spawning error storage writer")?; let mut inner_jobs_handles = Vec::new(); let mut payload_events_channel = self.blocks_source.recv_slot_channel(); let orderpool_subscriber = { let (handle, sub) = start_orderpool_jobs( - self.order_input_config, + self.order_input_config.clone(), self.provider_factory.clone(), - self.extra_rpc, + self.extra_rpc.clone(), self.global_cancellation.clone(), ) .await?; @@ -213,46 +216,6 @@ impl None, ); - /* - // This was done before in block building pool - { - let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); - let block_cancellation = self.global_cancellation.clone().child_token(); - - let cancel = block_cancellation.clone(); - tokio::spawn(async move { - tokio::time::sleep(max_time_to_build).await; - cancel.cancel(); - }); - - let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); - // add OrderReplacementManager to manage replacements and cancellations - let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); - // sink removal is automatic via OrderSink::is_alive false - let _block_sub = orderpool_subscriber.add_sink( - block_ctx.block_env.number.to(), - Box::new(order_replacement_manager), - ); - - let simulations_for_block = order_simulation_pool.spawn_simulation_job( - block_ctx.clone(), - orders_for_block, - block_cancellation.clone(), - ); - - let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone()); - - let input = BlockBuildingAlgorithmInput:: { - provider_factory: self.provider_factory.provider_factory_unchecked(), - ctx: block_ctx, - sink: builder_sink, - input: simulations_for_block.subscribe(), - cancel: block_cancellation, - }; - - self.builder.build_blocks(input); - } - */ let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); self.build_block( @@ -278,7 +241,7 @@ impl } fn build_block( - &self, + &mut self, max_time_to_build: Duration, block_ctx: BlockBuildingContext, payload: MevBoostSlotData, @@ -309,7 +272,9 @@ impl block_cancellation.clone(), ); - let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone()); + let builder_sink = self + .sink_factory + .create_sink(payload, block_cancellation.clone()); let input = BlockBuildingAlgorithmInput:: { provider_factory: self.provider_factory.provider_factory_unchecked(), diff --git a/crates/rbuilder/src/live_builder/payload_events/mod.rs b/crates/rbuilder/src/live_builder/payload_events/mod.rs index 2369be45..0701a9fb 100644 --- a/crates/rbuilder/src/live_builder/payload_events/mod.rs +++ b/crates/rbuilder/src/live_builder/payload_events/mod.rs @@ -110,16 +110,20 @@ impl MevBoostSlotDataGenerator { /// When MEV-boost is used, we tell the CL “--always-build-payload” (we are building blocks for ANY validator now!). The CL does /// it, but even with the event being created for every slot, the fee_recipient we get from MEV-Boost might be different so we should always replace it. /// Note that with MEV-boost the validator may change the fee_recipient when registering to the Relays. - pub fn spawn(self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { + pub fn spawn(&self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { let relays = RelaysForSlotData::new(&self.relays); let (send, receive) = mpsc::unbounded_channel(); + + let cls = self.cls.clone(); + let global_cancellation = self.global_cancellation.clone(); + let blocklist = self.blocklist.clone(); let handle = tokio::spawn(async move { let mut source = PayloadSourceMuxer::new( - &self.cls, + &cls, NEW_PAYLOAD_RECV_TIMEOUT, CONSENSUS_CLIENT_RECONNECT_WAIT, - self.global_cancellation.clone(), + global_cancellation.clone(), ); info!("MevBoostSlotDataGenerator: started"); @@ -127,7 +131,7 @@ impl MevBoostSlotDataGenerator { let mut recently_sent_data = VecDeque::with_capacity(RECENTLY_SENT_EVENTS_BUFF); while let Some(event) = source.recv().await { - if self.global_cancellation.is_cancelled() { + if global_cancellation.is_cancelled() { return; } @@ -151,9 +155,7 @@ impl MevBoostSlotDataGenerator { slot_data, }; - if let Err(err) = - check_slot_data_for_blocklist(&mev_boost_slot_data, &self.blocklist) - { + if let Err(err) = check_slot_data_for_blocklist(&mev_boost_slot_data, &blocklist) { warn!("Slot data failed blocklist check: {:?}", err); continue; } @@ -174,7 +176,7 @@ impl MevBoostSlotDataGenerator { } } // cancelling here because its a critical job - self.global_cancellation.cancel(); + global_cancellation.cancel(); source.join().await; info!("MevBoostSlotDataGenerator: finished"); @@ -185,7 +187,7 @@ impl MevBoostSlotDataGenerator { } impl SlotSource for MevBoostSlotDataGenerator { - fn recv_slot_channel(self) -> mpsc::UnboundedReceiver { + fn recv_slot_channel(&self) -> mpsc::UnboundedReceiver { let (_handle, chan) = self.spawn(); chan }