Skip to content

Commit

Permalink
fix: dirty workaround for known reth mempool issues
Browse files Browse the repository at this point in the history
  • Loading branch information
kien-rise committed Nov 7, 2024
1 parent 1ba631b commit 7f26c46
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
3 changes: 2 additions & 1 deletion crates/node/events/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ where
} else if let Some(latest_block) = this.state.latest_block {
let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now - this.state.latest_block_time.unwrap_or(0) > 60 {
// https://github.com/paradigmxyz/reth/commit/523bfb9c81ad7c2493882d83d0c9a6d0bcb2ce52#diff-9b49eb32052d06f5011c679046579e019121ea44f698926d8c407a08c29fa570R507
if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
// Once we start receiving consensus nodes, don't emit status unless stalled for
// 1 minute
info!(
Expand Down
2 changes: 2 additions & 0 deletions crates/transaction-pool/src/pool/best.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
self.add_new_transactions();
// Remove the next independent tx with the highest priority
let best = self.independent.pop_last()?;
// https://github.com/paradigmxyz/reth/issues/12340
self.all.remove(best.transaction.id());
let hash = best.transaction.hash();

// skip transactions that were marked as invalid
Expand Down
3 changes: 2 additions & 1 deletion crates/transaction-pool/src/pool/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pub struct PendingPool<T: TransactionOrdering> {
impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance.
pub fn new(ordering: T) -> Self {
let (new_transaction_notifier, _) = broadcast::channel(200);
// https://github.com/paradigmxyz/reth/issues/12336
let (new_transaction_notifier, _) = broadcast::channel(100000);
Self {
ordering,
submission_id: 0,
Expand Down
34 changes: 27 additions & 7 deletions crates/transaction-pool/src/pool/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,33 @@ impl<T: TransactionOrdering> TxPool<T> {

match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => {
// replace the new tx and remove the replaced in the subpool(s)
self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to);
// Update inserted transactions metric
self.metrics.inserted_transactions.increment(1);
let UpdateOutcome { promoted, discarded } = self.process_updates(updates);

let replaced = replaced_tx.map(|(tx, _)| tx);
// https://github.com/paradigmxyz/reth/issues/12286
let replaced = replaced_tx.clone().map(|(tx, _)| tx);

let (promoted, discarded) = if updates.is_empty() {
// 99% of the cases
self.add_new_transaction(transaction.clone(), replaced_tx, move_to);
self.metrics.inserted_transactions.increment(1);
(Vec::new(), Vec::new())
} else {
let (prev_updates, next_updates): (Vec<_>, Vec<_>) =
updates.into_iter().partition(|update| update.id < *transaction.id());
let prev_processed = self.process_updates(prev_updates);
self.add_new_transaction(transaction.clone(), replaced_tx, move_to);
self.metrics.inserted_transactions.increment(1);
let next_processed = self.process_updates(next_updates);
let promoted: Vec<_> = Iterator::chain(
prev_processed.promoted.into_iter(),
next_processed.promoted.into_iter(),
)
.collect();
let discarded: Vec<_> = Iterator::chain(
prev_processed.discarded.into_iter(),
next_processed.discarded.into_iter(),
)
.collect();
(promoted, discarded)
};

// This transaction was moved to the pending pool.
let res = if move_to.is_pending() {
Expand Down

0 comments on commit 7f26c46

Please sign in to comment.