Skip to content

Commit

Permalink
Merge branch 'develop' into less-warninig
Browse files Browse the repository at this point in the history
  • Loading branch information
ZanCorDX committed Dec 13, 2024
2 parents 5d4c067 + b605076 commit 0138673
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 25 deletions.
11 changes: 9 additions & 2 deletions crates/rbuilder/src/building/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth::{
revm::cached::CachedReads,
};
use reth_db::Database;
use reth_errors::ProviderError;
use reth_provider::{DatabaseProviderFactory, StateProviderFactory};
use std::{fmt::Debug, marker::PhantomData, sync::Arc};
use tokio::sync::{broadcast, broadcast::error::TryRecvError};
Expand Down Expand Up @@ -144,7 +145,9 @@ where
if !self.order_consumer.consume_next_commands()? {
return Ok(false);
}
self.update_onchain_nonces()?;
if !self.update_onchain_nonces()? {
return Ok(false);
}

self.order_consumer
.apply_new_commands(&mut self.block_orders);
Expand All @@ -161,7 +164,11 @@ where
SimulatedOrderCommand::Simulation(sim_order) => Some(sim_order),
SimulatedOrderCommand::Cancellation(_) => None,
});
let nonce_db_ref = self.nonce_cache.get_ref()?;
let nonce_db_ref = match self.nonce_cache.get_ref() {
Ok(nonce_db_ref) => nonce_db_ref,
Err(ProviderError::BlockHashNotFound(_)) => return Ok(false), // This can happen on reorgs since the block is removed
Err(err) => return Err(err.into()),
};
let mut nonces = Vec::new();
for new_order in new_orders {
for nonce in new_order.order.nonces() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use reth_provider::{BlockReader, DatabaseProviderFactory, StateProviderFactory};
use std::{marker::PhantomData, sync::Arc, time::Instant};
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{trace, warn};
use tracing::{info_span, trace};

use crate::{
building::{
builders::{
block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider},
UnfinishedBlockBuildingSink,
handle_building_error, UnfinishedBlockBuildingSink,
},
BlockBuildingContext,
},
Expand Down Expand Up @@ -104,7 +104,9 @@ where
}
if self.best_results.get_number_of_orders() > 0 {
let orders_closed_at = OffsetDateTime::now_utc();
self.try_build_block(orders_closed_at);
if !self.try_build_block(orders_closed_at) {
break;
}
}
}
trace!(
Expand All @@ -113,12 +115,12 @@ where
);
}

/// Attempts to build a new block if not already building.
/// Attempts to build a new block if not already building. Returns if block building should continue.
///
/// # Arguments
///
/// * `orders_closed_at` - The timestamp when orders were closed.
fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) {
fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) -> bool {
let time_start = Instant::now();

let current_best_results = self.best_results.clone();
Expand All @@ -128,7 +130,7 @@ where
// Check if version has incremented
if let Some(last_version) = self.last_version {
if version == last_version {
return;
return true;
}
}
self.last_version = Some(version);
Expand All @@ -140,18 +142,18 @@ where
);

if best_orderings_per_group.is_empty() {
return;
return true;
}

match self.build_new_block(&mut best_orderings_per_group, orders_closed_at) {
Ok(new_block) => {
if let Ok(value) = new_block.true_block_value() {
trace!(
"Parallel builder run id {}: Built new block with results version {:?} and profit: {:?} in {:?} ms",
self.run_id,
version,
format_ether(value),
time_start.elapsed().as_millis()
run_id = self.run_id,
version = version,
time_ms = time_start.elapsed().as_millis(),
profit = format_ether(value),
"Parallel builder built new block",
);

if new_block.built_block_trace().got_no_signer_error {
Expand All @@ -163,11 +165,15 @@ where
}
}
}
Err(e) => {
warn!("Parallel builder run id {}: Failed to build new block with results version {:?}: {:?}", self.run_id, version, e);
Err(err) => {
let _span = info_span!("Parallel builder failed to build new block",run_id = self.run_id,version = version,err=?err).entered();
if !handle_building_error(err) {
return false;
}
}
}
self.run_id += 1;
true
}

/// Builds a new block using the best results from each group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ where
self.thread_pool.spawn(move || {
while !cancellation_token.is_cancelled() {
if let Some(task) = task_queue.pop() {
if cancellation_token.is_cancelled() {
return;
}
let task_start = Instant::now();
if let Ok((task_id, result)) = Self::process_task(
task,
Expand All @@ -92,6 +95,7 @@ where
time_taken_ms = %task_start.elapsed().as_millis(),
"Conflict resolving: failed to send group result"
);
return;
}
}
}
Expand All @@ -100,7 +104,7 @@ where
});
}

pub fn process_task(
fn process_task(
task: ConflictTask,
ctx: &BlockBuildingContext,
provider: &P,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ConflictTaskGenerator {
.cloned()
.collect();

trace!("Removing subset groups: {:?}", subset_ids);
trace!(groups = ?subset_ids,"Removing subset groups");
for id in subset_ids {
self.existing_groups.remove(&id);
self.cancel_tasks_for_group(id);
Expand Down Expand Up @@ -158,10 +158,12 @@ impl ConflictTaskGenerator {
TaskPriority::High
};
trace!(
"Processing multi order group {group_id} with {} orders, {} profit with priority {:?}",
new_group.orders.len(),
format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())),
priority.display()
group = group_id,
order_count = new_group.orders.len(),
profit =
format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())),
priority = priority.display(),
"Processing multi order group"
);
if self.existing_groups.contains_key(&group_id) {
self.update_tasks(group_id, new_group, priority);
Expand Down Expand Up @@ -289,9 +291,9 @@ impl ConflictTaskGenerator {
priority: TaskPriority,
) {
trace!(
"Updating tasks for group {} with priority {:?}",
group_id,
priority.display()
group = group_id,
priority = priority.display(),
"Updating tasks",
);
// Cancel existing tasks for this grou
self.cancel_tasks_for_group(group_id);
Expand Down

0 comments on commit 0138673

Please sign in to comment.