Skip to content

Commit

Permalink
Add EVM txs eviction logic (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored and udpatil committed Apr 16, 2024
1 parent d251775 commit 693d289
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 43 deletions.
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ pending-size = {{ .Mempool.PendingSize }}
max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }}
pending-ttl-duration = {{ .Mempool.PendingTTLDuration }}
pending-ttl-duration = "{{ .Mempool.PendingTTLDuration }}"
pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }}
Expand Down
36 changes: 26 additions & 10 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
pendingTxs: NewPendingTxs(cfg),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -340,7 +340,9 @@ func (txmp *TxMempool) CheckTx(
return err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))
txmp.pendingTxs.Insert(wtx, res, txInfo)
if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil {
return err
}
}
}

Expand All @@ -362,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, true)
return nil
}

Expand Down Expand Up @@ -401,7 +403,7 @@ func (txmp *TxMempool) Flush() {
txmp.timestampIndex.Reset()

for _, wtx := range txmp.txStore.GetAllTxs() {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
Expand Down Expand Up @@ -513,7 +515,7 @@ func (txmp *TxMempool) Update(

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}
}

Expand Down Expand Up @@ -634,7 +636,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
txmp.removeTx(toEvict, true)
txmp.removeTx(toEvict, true, true)
txmp.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
Expand Down Expand Up @@ -745,7 +747,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
}
}

Expand Down Expand Up @@ -871,13 +873,13 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
return true
}

func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
if txmp.txStore.IsTxRemoved(wtx.hash) {
return
}

txmp.txStore.RemoveTx(wtx)
txmp.priorityIndex.RemoveTx(wtx)
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
txmp.heightIndex.Remove(wtx)
txmp.timestampIndex.Remove(wtx)

Expand All @@ -889,6 +891,20 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

wtx.removeHandler(removeFromCache)

if shouldReenqueue {
for _, reenqueue := range toBeReenqueued {
txmp.removeTx(reenqueue, removeFromCache, false)
}
for _, reenqueue := range toBeReenqueued {
rtx := reenqueue.tx
go func() {
if err := txmp.CheckTx(context.Background(), rtx, nil, TxInfo{}); err != nil {
txmp.logger.Error(fmt.Sprintf("failed to reenqueue transaction %X due to %s", rtx.Hash(), err))
}
}()
}
}
}

func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) {
Expand Down Expand Up @@ -967,7 +983,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))
txmp.expire(blockHeight, wtx)
})
Expand Down
127 changes: 121 additions & 6 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
// transaction priority based on the value in the key/value pair.
type application struct {
*kvstore.Application

occupiedNonces map[string][]uint64
}

type testTx struct {
Expand All @@ -37,6 +39,7 @@ type testTx struct {
}

func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {

var (
priority int64
sender string
Expand All @@ -57,7 +60,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
nonce, err := strconv.ParseUint(string(parts[3]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Expand All @@ -66,15 +69,50 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
if app.occupiedNonces == nil {
app.occupiedNonces = make(map[string][]uint64)
}
if _, exists := app.occupiedNonces[account]; !exists {
app.occupiedNonces[account] = []uint64{}
}
active := true
for i := uint64(0); i < nonce; i++ {
found := false
for _, occ := range app.occupiedNonces[account] {
if occ == i {
found = true
break
}
}
if !found {
active = false
break
}
}
app.occupiedNonces[account] = append(app.occupiedNonces[account], nonce)
return &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
Priority: v,
Code: code.CodeTypeOK,
GasWanted: 1,
},
EVMNonce: uint64(nonce),
EVMSenderAddress: account,
IsEVM: true,
EVMNonce: nonce,
EVMSenderAddress: account,
IsEVM: true,
IsPendingTransaction: !active,
Checker: func() abci.PendingTxCheckerResponse { return abci.Pending },
ExpireTxHandler: func() {
idx := -1
for i, n := range app.occupiedNonces[account] {
if n == nonce {
idx = i
break
}
}
if idx >= 0 {
app.occupiedNonces[account] = append(app.occupiedNonces[account][:idx], app.occupiedNonces[account][idx+1:]...)
}
},
}, nil
}

Expand Down Expand Up @@ -501,12 +539,14 @@ func TestTxMempool_Prioritization(t *testing.T) {
txs := [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}
evmTxs := [][]byte{
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
}

// copy the slice of txs and shuffle the order randomly
txsCopy := make([][]byte, len(txs))
Expand All @@ -515,6 +555,16 @@ func TestTxMempool_Prioritization(t *testing.T) {
rng.Shuffle(len(txsCopy), func(i, j int) {
txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i]
})
txs = [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}
txsCopy = append(txsCopy, evmTxs...)

for i := range txsCopy {
require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID}))
Expand All @@ -535,6 +585,71 @@ func TestTxMempool_Prioritization(t *testing.T) {
}
}

func TestTxMempool_PendingStoreSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.PendingSize = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 1)), nil, TxInfo{SenderID: peerID}))
err := txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 2)), nil, TxInfo{SenderID: peerID})
require.Error(t, err)
require.Contains(t, err.Error(), "mempool pending set is full")
}

func TestTxMempool_EVMEviction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.Size = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 0)), nil, TxInfo{SenderID: peerID}))
// this should evict the previous tx
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 2, 0)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
require.Equal(t, int64(2), txmp.priorityIndex.txs[0].priority)

txmp.config.Size = 2
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 3, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 0, txmp.pendingTxs.Size())
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
// this would evict the tx with priority 2 and cause the tx with priority 3 to go pending
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 4, 0)), nil, TxInfo{SenderID: peerID}))
time.Sleep(1 * time.Second) // reenqueue is async
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
tx := txmp.priorityIndex.txs[0]
require.Equal(t, 1, txmp.pendingTxs.Size())

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
txmp.removeTx(tx, true, false)
// should not reenqueue
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
require.Equal(t, 1, txmp.pendingTxs.Size())
}

func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
26 changes: 18 additions & 8 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
pq.mtx.RLock()
defer pq.mtx.RUnlock()

txs := make([]*WrappedTx, len(pq.txs))
copy(txs, pq.txs)
txs := []*WrappedTx{}
txs = append(txs, pq.txs...)
for _, queue := range pq.evmQueue {
txs = append(txs, queue[1:]...)
}

sort.Slice(txs, func(i, j int) bool {
return txs[i].priority < txs[j].priority
Expand Down Expand Up @@ -111,18 +114,19 @@ func (pq *TxPriorityQueue) NumTxs() int {
return len(pq.txs) + pq.numQueuedUnsafe()
}

func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx int) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.tx.Key() == tx.tx.Key() {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
}
break
return i
}
}
}
return -1
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
Expand All @@ -135,21 +139,27 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
}

// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx, shouldReenqueue bool) (toBeReenqueued []*WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

var removedIdx int

if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
if len(pq.evmQueue[tx.evmAddress]) > 0 {
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
if !shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
}
} else if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
}
if tx.isEVM && shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 && removedIdx >= 0 {
toBeReenqueued = pq.evmQueue[tx.evmAddress][removedIdx:]
}
return
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
Expand Down
Loading

0 comments on commit 693d289

Please sign in to comment.