From 20b4b85659fec86062ae9195d3e44b62bed50df2 Mon Sep 17 00:00:00 2001 From: codchen Date: Tue, 2 Jan 2024 23:45:44 +0800 Subject: [PATCH] add TTL for pending txs (#174) --- internal/mempool/mempool.go | 6 ++++-- internal/mempool/tx.go | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 41f5b587c..56d0cb6eb 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -492,8 +492,8 @@ func (txmp *TxMempool) Update( } } - txmp.handlePendingTransactions() txmp.purgeExpiredTxs(blockHeight) + txmp.handlePendingTransactions() // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining @@ -844,7 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { // purgeExpiredTxs removes all transactions that have exceeded their respective // height- and/or time-based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool, but preserved in the cache. +// transaction will be removed from the mempool, but preserved in the cache (except for pending txs). // // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which // the caller has a write-lock on the mempool and so we can safely iterate over @@ -890,6 +890,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { for _, wtx := range expiredTxs { txmp.removeTx(wtx, false) } + + txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove) } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c74872054..2f3ec7b8f 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -372,3 +372,41 @@ func (p *PendingTxs) Size() int { defer p.mtx.RUnlock() return len(p.txs) } + +func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if len(p.txs) == 0 { + return + } + + // txs retains the ordering of insertion + if ttlNumBlock > 0 { + idxFirstNotExpiredTx := len(p.txs) + for i, ptx := range p.txs { + if (blockHeight - ptx.tx.height) <= ttlNumBlock { + idxFirstNotExpiredTx = i + } else { + cb(ptx.tx.tx) + } + } + p.txs = p.txs[idxFirstNotExpiredTx:] + } + + if len(p.txs) == 0 { + return + } + + if ttlDuration > 0 { + idxFirstNotExpiredTx := len(p.txs) + for i, ptx := range p.txs { + if now.Sub(ptx.tx.timestamp) <= ttlDuration { + idxFirstNotExpiredTx = i + } else { + cb(ptx.tx.tx) + } + } + p.txs = p.txs[idxFirstNotExpiredTx:] + } +}