From 7f26c464d613a66883b1dce0e99e1e7cc2abc64e Mon Sep 17 00:00:00 2001 From: Kien Nguyen Date: Thu, 7 Nov 2024 15:45:35 +0700 Subject: [PATCH] fix: dirty workaround for known reth mempool issues --- crates/node/events/src/node.rs | 3 +- crates/transaction-pool/src/pool/best.rs | 2 ++ crates/transaction-pool/src/pool/pending.rs | 3 +- crates/transaction-pool/src/pool/txpool.rs | 34 ++++++++++++++++----- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/crates/node/events/src/node.rs b/crates/node/events/src/node.rs index c856c0ec9e18..b4ea73bd4385 100644 --- a/crates/node/events/src/node.rs +++ b/crates/node/events/src/node.rs @@ -504,7 +504,8 @@ where } else if let Some(latest_block) = this.state.latest_block { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); - if now - this.state.latest_block_time.unwrap_or(0) > 60 { + // https://github.com/paradigmxyz/reth/commit/523bfb9c81ad7c2493882d83d0c9a6d0bcb2ce52#diff-9b49eb32052d06f5011c679046579e019121ea44f698926d8c407a08c29fa570R507 + if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 { // Once we start receiving consensus nodes, don't emit status unless stalled for // 1 minute info!( diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs index 5880a73f5101..94a7d7535d56 100644 --- a/crates/transaction-pool/src/pool/best.rs +++ b/crates/transaction-pool/src/pool/best.rs @@ -169,6 +169,8 @@ impl Iterator for BestTransactions { self.add_new_transactions(); // Remove the next independent tx with the highest priority let best = self.independent.pop_last()?; + // https://github.com/paradigmxyz/reth/issues/12340 + self.all.remove(best.transaction.id()); let hash = best.transaction.hash(); // skip transactions that were marked as invalid diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index ff3ecf65a494..5eb1fce5c7ab 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -60,7 +60,8 @@ pub struct PendingPool { impl PendingPool { /// Create a new pool instance. pub fn new(ordering: T) -> Self { - let (new_transaction_notifier, _) = broadcast::channel(200); + // https://github.com/paradigmxyz/reth/issues/12336 + let (new_transaction_notifier, _) = broadcast::channel(100000); Self { ordering, submission_id: 0, diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 10605565c85d..680c6076b363 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -522,13 +522,33 @@ impl TxPool { match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => { - // replace the new tx and remove the replaced in the subpool(s) - self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to); - // Update inserted transactions metric - self.metrics.inserted_transactions.increment(1); - let UpdateOutcome { promoted, discarded } = self.process_updates(updates); - - let replaced = replaced_tx.map(|(tx, _)| tx); + // https://github.com/paradigmxyz/reth/issues/12286 + let replaced = replaced_tx.clone().map(|(tx, _)| tx); + + let (promoted, discarded) = if updates.is_empty() { + // 99% of the cases + self.add_new_transaction(transaction.clone(), replaced_tx, move_to); + self.metrics.inserted_transactions.increment(1); + (Vec::new(), Vec::new()) + } else { + let (prev_updates, next_updates): (Vec<_>, Vec<_>) = + updates.into_iter().partition(|update| update.id < *transaction.id()); + let prev_processed = self.process_updates(prev_updates); + self.add_new_transaction(transaction.clone(), replaced_tx, move_to); + self.metrics.inserted_transactions.increment(1); + let next_processed = self.process_updates(next_updates); + let promoted: Vec<_> = Iterator::chain( + prev_processed.promoted.into_iter(), + next_processed.promoted.into_iter(), + ) + .collect(); + let discarded: Vec<_> = Iterator::chain( + prev_processed.discarded.into_iter(), + next_processed.discarded.into_iter(), + ) + .collect(); + (promoted, discarded) + }; // This transaction was moved to the pending pool. let res = if move_to.is_pending() {