Skip to content

Commit

Permalink
Add EVM txs eviction logic
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Feb 21, 2024
1 parent 530feb8 commit f5be25b
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 41 deletions.
58 changes: 48 additions & 10 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
pendingTxs: NewPendingTxs(cfg),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -340,7 +340,9 @@ func (txmp *TxMempool) CheckTx(
return err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))
txmp.pendingTxs.Insert(wtx, res, txInfo)
if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil {
return err
}
}
}

Expand All @@ -362,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, true)
return nil
}

Expand Down Expand Up @@ -401,7 +403,7 @@ func (txmp *TxMempool) Flush() {
txmp.timestampIndex.Reset()

for _, wtx := range txmp.txStore.GetAllTxs() {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
Expand Down Expand Up @@ -474,6 +476,28 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

func (txmp *TxMempool) ReapMaxTxsLog(max int) types.Txs {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

wTxs := txmp.priorityIndex.PeekTxs(max)
txs := make([]types.Tx, 0, len(wTxs))
for _, wtx := range wTxs {
fmt.Println(wtx.priority)
txs = append(txs, wtx.tx)
}
fmt.Println("=====")
if len(txs) < max {
// retrieve more from pending txs
pending := txmp.pendingTxs.Peek(max - len(txs))
for _, ptx := range pending {
txs = append(txs, ptx.tx.tx)
fmt.Println(ptx.tx.priority)
}
}
return txs
}

// Update iterates over all the transactions provided by the block producer,
// removes them from the cache (if applicable), and removes
// the transactions from the main transaction store and associated indexes.
Expand Down Expand Up @@ -513,7 +537,7 @@ func (txmp *TxMempool) Update(

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}
}

Expand Down Expand Up @@ -634,7 +658,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
txmp.removeTx(toEvict, true)
txmp.removeTx(toEvict, true, true)
txmp.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
Expand Down Expand Up @@ -745,7 +769,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
}
}

Expand Down Expand Up @@ -871,13 +895,13 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
return true
}

func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
if txmp.txStore.IsTxRemoved(wtx.hash) {
return
}

txmp.txStore.RemoveTx(wtx)
txmp.priorityIndex.RemoveTx(wtx)
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
txmp.heightIndex.Remove(wtx)
txmp.timestampIndex.Remove(wtx)

Expand All @@ -889,6 +913,20 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

wtx.removeHandler(removeFromCache)

if shouldReenqueue {
for _, reenqueue := range toBeReenqueued {
txmp.removeTx(reenqueue, removeFromCache, false)
}
for _, reenqueue := range toBeReenqueued {
rtx := reenqueue.tx
go func() {
if err := txmp.CheckTx(context.Background(), rtx, nil, TxInfo{}); err != nil {
txmp.logger.Error(fmt.Sprintf("failed to reenqueue transaction %X due to %s", rtx.Hash(), err))
}
}()
}
}
}

func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) {
Expand Down Expand Up @@ -967,7 +1005,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))
txmp.expire(blockHeight, wtx)
})
Expand Down
112 changes: 107 additions & 5 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
// transaction priority based on the value in the key/value pair.
type application struct {
*kvstore.Application

occupiedNonces map[string][]uint64
}

type testTx struct {
Expand Down Expand Up @@ -58,7 +60,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
nonce, err := strconv.ParseUint(string(parts[3]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Expand All @@ -67,15 +69,50 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
if app.occupiedNonces == nil {
app.occupiedNonces = make(map[string][]uint64)
}
if _, exists := app.occupiedNonces[account]; !exists {
app.occupiedNonces[account] = []uint64{}
}
active := true
for i := uint64(0); i < nonce; i++ {
found := false
for _, occ := range app.occupiedNonces[account] {
if occ == i {
found = true
break
}
}
if !found {
active = false
break
}
}
app.occupiedNonces[account] = append(app.occupiedNonces[account], nonce)
return &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
Priority: v,
Code: code.CodeTypeOK,
GasWanted: 1,
},
EVMNonce: uint64(nonce),
EVMSenderAddress: account,
IsEVM: true,
EVMNonce: nonce,
EVMSenderAddress: account,
IsEVM: true,
IsPendingTransaction: !active,
Checker: func() abci.PendingTxCheckerResponse { return abci.Pending },
ExpireTxHandler: func() {
idx := -1
for i, n := range app.occupiedNonces[account] {
if n == nonce {
idx = i
break
}
}
if idx >= 0 {
app.occupiedNonces[account] = append(app.occupiedNonces[account][:idx], app.occupiedNonces[account][idx+1:]...)
}
},
}, nil
}

Expand Down Expand Up @@ -490,7 +527,7 @@ func TestTxMempool_Prioritization(t *testing.T) {
}

// Reap the transactions
reapedTxs := txmp.ReapMaxTxs(len(txs))
reapedTxs := txmp.ReapMaxTxsLog(len(txs))
// Check if the reaped transactions are in the correct order of their priorities
for _, tx := range txs {
fmt.Printf("expected: %s\n", string(tx))
Expand All @@ -504,6 +541,71 @@ func TestTxMempool_Prioritization(t *testing.T) {
}
}

func TestTxMempool_PendingStoreSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.PendingSize = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 1)), nil, TxInfo{SenderID: peerID}))
err := txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 2)), nil, TxInfo{SenderID: peerID})
require.Error(t, err)
require.Contains(t, err.Error(), "mempool pending set is full")
}

func TestTxMempool_EVMEviction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.Size = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 0)), nil, TxInfo{SenderID: peerID}))
// this should evict the previous tx
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 2, 0)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
require.Equal(t, int64(2), txmp.priorityIndex.txs[0].priority)

txmp.config.Size = 2
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 3, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 0, txmp.pendingTxs.Size())
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
// this would evict the tx with priority 2 and cause the tx with priority 3 to go pending
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 4, 0)), nil, TxInfo{SenderID: peerID}))
time.Sleep(1 * time.Second) // reenqueue is async
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
tx := txmp.priorityIndex.txs[0]
require.Equal(t, 1, txmp.pendingTxs.Size())

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
txmp.removeTx(tx, true, false)
// should not reenqueue
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
require.Equal(t, 1, txmp.pendingTxs.Size())
}

func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
26 changes: 18 additions & 8 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
pq.mtx.RLock()
defer pq.mtx.RUnlock()

txs := make([]*WrappedTx, len(pq.txs))
copy(txs, pq.txs)
txs := []*WrappedTx{}
txs = append(txs, pq.txs...)
for _, queue := range pq.evmQueue {
txs = append(txs, queue[1:]...)
}

sort.Slice(txs, func(i, j int) bool {
return txs[i].priority < txs[j].priority
Expand Down Expand Up @@ -111,18 +114,19 @@ func (pq *TxPriorityQueue) NumTxs() int {
return len(pq.txs) + pq.numQueuedUnsafe()
}

func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx int) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.tx.Key() == tx.tx.Key() {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
}
break
return i
}
}
}
return -1
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
Expand All @@ -135,21 +139,27 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
}

// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx, shouldReenqueue bool) (toBeReenqueued []*WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

var removedIdx int

if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
if len(pq.evmQueue[tx.evmAddress]) > 0 {
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
if !shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
}
} else if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
}
if tx.isEVM && shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 && removedIdx >= 0 {
toBeReenqueued = pq.evmQueue[tx.evmAddress][removedIdx:]
}
return
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
Expand Down
8 changes: 4 additions & 4 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) {
pq.PushTx(tx1)
pq.PushTx(tx2)

pq.RemoveTx(tx1)
pq.RemoveTx(tx1, false)

result := pq.PopTx()
require.Equal(t, tx2, result)
Expand Down Expand Up @@ -360,14 +360,14 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
max := values[len(values)-1]

wtx := pq.txs[pq.NumTxs()/2]
pq.RemoveTx(wtx)
pq.RemoveTx(wtx, false)
require.Equal(t, numTxs-1, pq.NumTxs())
require.Equal(t, int64(max), pq.PopTx().priority)
require.Equal(t, numTxs-2, pq.NumTxs())

require.NotPanics(t, func() {
pq.RemoveTx(&WrappedTx{heapIndex: numTxs})
pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1})
pq.RemoveTx(&WrappedTx{heapIndex: numTxs}, false)
pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}, false)
})
require.Equal(t, numTxs-2, pq.NumTxs())
}
Loading

0 comments on commit f5be25b

Please sign in to comment.