From 5ba3eff9611589f84eb97f06fba47091baab790b Mon Sep 17 00:00:00 2001 From: ZanCorDX <126988525+ZanCorDX@users.noreply.github.com> Date: Fri, 13 Dec 2024 11:28:17 -0300 Subject: [PATCH 1/2] OrderIntakeConsumer fixes for less warninig. (#281) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📝 Summary - Stop building on reorg - update_onchain_nonces result propagated ## 💡 Motivation and Context Warninig annoy me --- ## ✅ I have completed the following steps: * [x] Run `make lint` * [x] Run `make test` * [ ] Added tests (if applicable) --- crates/rbuilder/src/building/builders/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index c71f53e5..2297a4c2 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -19,6 +19,7 @@ use reth::{ revm::cached::CachedReads, }; use reth_db::Database; +use reth_errors::ProviderError; use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; use std::{fmt::Debug, marker::PhantomData, sync::Arc}; use tokio::sync::{broadcast, broadcast::error::TryRecvError}; @@ -144,7 +145,9 @@ where if !self.order_consumer.consume_next_commands()? { return Ok(false); } - self.update_onchain_nonces()?; + if !self.update_onchain_nonces()? { + return Ok(false); + } self.order_consumer .apply_new_commands(&mut self.block_orders); @@ -161,7 +164,11 @@ where SimulatedOrderCommand::Simulation(sim_order) => Some(sim_order), SimulatedOrderCommand::Cancellation(_) => None, }); - let nonce_db_ref = self.nonce_cache.get_ref()?; + let nonce_db_ref = match self.nonce_cache.get_ref() { + Ok(nonce_db_ref) => nonce_db_ref, + Err(ProviderError::BlockHashNotFound(_)) => return Ok(false), // This can happen on reorgs since the block is removed + Err(err) => return Err(err.into()), + }; let mut nonces = Vec::new(); for new_order in new_orders { for nonce in new_order.order.nonces() { From b60507660e4e07dded7394baebf4fe1f4abe13e1 Mon Sep 17 00:00:00 2001 From: ZanCorDX <126988525+ZanCorDX@users.noreply.github.com> Date: Fri, 13 Dec 2024 11:30:47 -0300 Subject: [PATCH 2/2] Parallel builder polish (#282) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📝 Summary Less warning/errors and error handling improved a little. ## ✅ I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable) --- .../block_building_result_assembler.rs | 34 +++++++++++-------- .../conflict_resolving_pool.rs | 11 ++++-- .../conflict_task_generator.rs | 23 +++++++------ 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs index 37b95347..7e1c02bc 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs @@ -10,13 +10,13 @@ use reth_provider::{BlockReader, DatabaseProviderFactory, StateProviderFactory}; use std::{marker::PhantomData, sync::Arc, time::Instant}; use time::OffsetDateTime; use tokio_util::sync::CancellationToken; -use tracing::{trace, warn}; +use tracing::{info_span, trace}; use crate::{ building::{ builders::{ block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider}, - UnfinishedBlockBuildingSink, + handle_building_error, UnfinishedBlockBuildingSink, }, BlockBuildingContext, }, @@ -104,7 +104,9 @@ where } if self.best_results.get_number_of_orders() > 0 { let orders_closed_at = OffsetDateTime::now_utc(); - self.try_build_block(orders_closed_at); + if !self.try_build_block(orders_closed_at) { + break; + } } } trace!( @@ -113,12 +115,12 @@ where ); } - /// Attempts to build a new block if not already building. + /// Attempts to build a new block if not already building. Returns if block building should continue. /// /// # Arguments /// /// * `orders_closed_at` - The timestamp when orders were closed. - fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) { + fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) -> bool { let time_start = Instant::now(); let current_best_results = self.best_results.clone(); @@ -128,7 +130,7 @@ where // Check if version has incremented if let Some(last_version) = self.last_version { if version == last_version { - return; + return true; } } self.last_version = Some(version); @@ -140,18 +142,18 @@ where ); if best_orderings_per_group.is_empty() { - return; + return true; } match self.build_new_block(&mut best_orderings_per_group, orders_closed_at) { Ok(new_block) => { if let Ok(value) = new_block.true_block_value() { trace!( - "Parallel builder run id {}: Built new block with results version {:?} and profit: {:?} in {:?} ms", - self.run_id, - version, - format_ether(value), - time_start.elapsed().as_millis() + run_id = self.run_id, + version = version, + time_ms = time_start.elapsed().as_millis(), + profit = format_ether(value), + "Parallel builder built new block", ); if new_block.built_block_trace().got_no_signer_error { @@ -163,11 +165,15 @@ where } } } - Err(e) => { - warn!("Parallel builder run id {}: Failed to build new block with results version {:?}: {:?}", self.run_id, version, e); + Err(err) => { + let _span = info_span!("Parallel builder failed to build new block",run_id = self.run_id,version = version,err=?err).entered(); + if !handle_building_error(err) { + return false; + } } } self.run_id += 1; + true } /// Builds a new block using the best results from each group. diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs index a6ac2b72..e9571eff 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs @@ -69,6 +69,9 @@ where self.thread_pool.spawn(move || { while !cancellation_token.is_cancelled() { if let Some(task) = task_queue.pop() { + if cancellation_token.is_cancelled() { + return; + } let task_start = Instant::now(); if let Ok((task_id, result)) = Self::process_task( task, @@ -92,6 +95,7 @@ where time_taken_ms = %task_start.elapsed().as_millis(), "Conflict resolving: failed to send group result" ); + return; } } } @@ -100,7 +104,7 @@ where }); } - pub fn process_task( + fn process_task( task: ConflictTask, ctx: &BlockBuildingContext, provider: &P, @@ -131,8 +135,9 @@ where } Err(err) => { warn!( - "Error running conflict task for group_idx {}: {:?}", - task_id, err + group_id = task_id, + err = ?err, + "Error running conflict task for group_idx", ); Err(err) } diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs index 9750dc80..55b2f6dd 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs @@ -106,7 +106,7 @@ impl ConflictTaskGenerator { .cloned() .collect(); - trace!("Removing subset groups: {:?}", subset_ids); + trace!(groups = ?subset_ids,"Removing subset groups"); for id in subset_ids { self.existing_groups.remove(&id); self.cancel_tasks_for_group(id); @@ -158,10 +158,12 @@ impl ConflictTaskGenerator { TaskPriority::High }; trace!( - "Processing multi order group {group_id} with {} orders, {} profit with priority {:?}", - new_group.orders.len(), - format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())), - priority.display() + group = group_id, + order_count = new_group.orders.len(), + profit = + format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())), + priority = priority.display(), + "Processing multi order group" ); if self.existing_groups.contains_key(&group_id) { self.update_tasks(group_id, new_group, priority); @@ -186,8 +188,9 @@ impl ConflictTaskGenerator { .send((group_id, (sequence_of_orders, group.clone()))) { warn!( - "Failed to send single order result for group {}: {:?}", - group_id, e + error = ?e, + group_id, + "Failed to send single order result for group", ); } } @@ -294,9 +297,9 @@ impl ConflictTaskGenerator { priority: TaskPriority, ) { trace!( - "Updating tasks for group {} with priority {:?}", - group_id, - priority.display() + group = group_id, + priority = priority.display(), + "Updating tasks", ); // Cancel existing tasks for this grou self.cancel_tasks_for_group(group_id);