Skip to content

Commit

Permalink
EVM transaction replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Feb 28, 2024
1 parent 0c51651 commit bd9d427
Show file tree
Hide file tree
Showing 12 changed files with 778 additions and 302 deletions.
887 changes: 608 additions & 279 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

45 changes: 34 additions & 11 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false, true)
txmp.removeTx(wtx, false, true, true)

Check warning on line 367 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L367

Added line #L367 was not covered by tests
return nil
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func (txmp *TxMempool) Flush() {
txmp.timestampIndex.Reset()

for _, wtx := range txmp.txStore.GetAllTxs() {
txmp.removeTx(wtx, false, false)
txmp.removeTx(wtx, false, false, true)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
Expand Down Expand Up @@ -515,7 +515,17 @@ func (txmp *TxMempool) Update(

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false, false)
txmp.removeTx(wtx, false, false, true)
}
if execTxResult[i].EvmTxInfo != nil {
// remove any tx that has the same nonce (because the committed tx
// may be from block proposal and is never in the local mempool)
if wtx, _ := txmp.priorityIndex.GetTxWithSameNonce(&WrappedTx{
evmAddress: execTxResult[i].EvmTxInfo.SenderAddress,
evmNonce: execTxResult[i].EvmTxInfo.Nonce,
}); wtx != nil {
txmp.removeTx(wtx, false, false, true)
}

Check warning on line 528 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L521-L528

Added lines #L521 - L528 were not covered by tests
}
}

Expand Down Expand Up @@ -636,7 +646,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
txmp.removeTx(toEvict, true, true)
txmp.removeTx(toEvict, true, true, true)
txmp.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
Expand All @@ -655,11 +665,19 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
txInfo.SenderID: {},
}

replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx)
if shouldDrop {
return nil
}

Check warning on line 671 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L670-L671

Added lines #L670 - L671 were not covered by tests

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

if txmp.insertTx(wtx) {
if replaced != nil {
txmp.removeTx(replaced, true, false, false)
}

Check warning on line 679 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L678-L679

Added lines #L678 - L679 were not covered by tests
if txmp.insertTx(wtx, replaced == nil) {
txmp.logger.Debug(
"inserted good transaction",
"priority", wtx.priority,
Expand Down Expand Up @@ -747,7 +765,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true, true)

Check warning on line 768 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L768

Added line #L768 was not covered by tests
}
}

Expand Down Expand Up @@ -853,13 +871,15 @@ func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
return nil
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool {
if txmp.isInMempool(wtx.tx) {
return false
}

txmp.txStore.SetTx(wtx)
txmp.priorityIndex.PushTx(wtx)
if updatePriorityIndex {
txmp.priorityIndex.PushTx(wtx)
}
txmp.heightIndex.Insert(wtx)
txmp.timestampIndex.Insert(wtx)

Expand All @@ -873,13 +893,16 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
return true
}

func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool, updatePriorityIndex bool) {
if txmp.txStore.IsTxRemoved(wtx.hash) {
return
}

txmp.txStore.RemoveTx(wtx)
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
toBeReenqueued := []*WrappedTx{}
if updatePriorityIndex {
toBeReenqueued = txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
}
txmp.heightIndex.Remove(wtx)
txmp.timestampIndex.Remove(wtx)

Expand All @@ -894,7 +917,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReen

if shouldReenqueue {
for _, reenqueue := range toBeReenqueued {
txmp.removeTx(reenqueue, removeFromCache, false)
txmp.removeTx(reenqueue, removeFromCache, false, true)
}
for _, reenqueue := range toBeReenqueued {
rtx := reenqueue.tx
Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestTxMempool_EVMEviction(t *testing.T) {

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
txmp.removeTx(tx, true, false)
txmp.removeTx(tx, true, false, true)
// should not reenqueue
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
Expand Down
52 changes: 50 additions & 2 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)

// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx // priority heap
mtx sync.RWMutex
txs []*WrappedTx // priority heap
// invariant 1: no duplicate nonce in the same queue
// invariant 2: no nonce gap in the same queue
// invariant 3: head of the queue must be in heap
evmQueue map[string][]*WrappedTx // sorted by nonce
}

Expand Down Expand Up @@ -50,6 +53,51 @@ func NewTxPriorityQueue() *TxPriorityQueue {
return pq
}

func (pq *TxPriorityQueue) GetTxWithSameNonce(tx *WrappedTx) (*WrappedTx, int) {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
return pq.getTxWithSameNonceUnsafe(tx)

Check warning on line 59 in internal/mempool/priority_queue.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/priority_queue.go#L56-L59

Added lines #L56 - L59 were not covered by tests
}

func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx, int) {
queue, ok := pq.evmQueue[tx.evmAddress]
if !ok {
return nil, -1
}

Check warning on line 66 in internal/mempool/priority_queue.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/priority_queue.go#L65-L66

Added lines #L65 - L66 were not covered by tests
idx := binarySearch(queue, tx)
if idx < len(queue) && queue[idx].evmNonce == tx.evmNonce {
return queue[idx], idx
}
return nil, -1
}

func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
if !tx.isEVM {
return nil, false
}
pq.mtx.Lock()
defer pq.mtx.Unlock()
queue, ok := pq.evmQueue[tx.evmAddress]
if ok && len(queue) > 0 {
existing, idx := pq.getTxWithSameNonceUnsafe(tx)
if existing != nil {
if tx.priority > existing.priority {
// should replace
// replace heap if applicable
if hi, ok := pq.findTxIndexUnsafe(existing); ok {
heap.Remove(pq, hi)
heap.Push(pq, tx) // need to be in the heap since it has the same nonce
}
pq.evmQueue[tx.evmAddress][idx] = tx // replace queue item in-place
return existing, false
}
// tx should be dropped since it's dominated by an existing tx
return nil, true
}
}
return nil, false
}

// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
// evicted to make room for another *WrappedTx with higher priority. If no such
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
Expand Down
77 changes: 75 additions & 2 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
{sender: "2", isEVM: false, priority: 9},
{sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8},
{sender: "6", isEVM: false, priority: 6},
{sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5},
},
expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7},
expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7},
},
{
name: "PriorityWithEVMAndNonEVM",
Expand Down Expand Up @@ -371,3 +370,77 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
})
require.Equal(t, numTxs-2, pq.NumTxs())
}

func TestTxPriorityQueue_TryReplacement(t *testing.T) {
for _, test := range []struct {
tx *WrappedTx
existing []*WrappedTx
expectedReplaced bool
expectedDropped bool
expectedQueue []*WrappedTx
expectedHeap []*WrappedTx
}{
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, false, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, false, true, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, true, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 99, tx: []byte("ghi")},
}, true, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
} {
pq := NewTxPriorityQueue()
for _, e := range test.existing {
pq.PushTx(e)
}
replaced, dropped := pq.TryReplacement(test.tx)
if test.expectedReplaced {
require.NotNil(t, replaced)
} else {
require.Nil(t, replaced)
}
require.Equal(t, test.expectedDropped, dropped)
for i, q := range pq.evmQueue[test.tx.evmAddress] {
require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key())
require.Equal(t, test.expectedQueue[i].priority, q.priority)
require.Equal(t, test.expectedQueue[i].evmNonce, q.evmNonce)
}
for i, q := range pq.txs {
require.Equal(t, test.expectedHeap[i].tx.Key(), q.tx.Key())
require.Equal(t, test.expectedHeap[i].priority, q.priority)
require.Equal(t, test.expectedHeap[i].evmNonce, q.evmNonce)
}
}
}
4 changes: 2 additions & 2 deletions internal/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
secondaryReactor.observePanic = observePanic

firstTx := &WrappedTx{}
primaryMempool.insertTx(firstTx)
primaryMempool.insertTx(firstTx, true)

// run the router
rts.start(ctx, t)
Expand All @@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
primaryMempool.insertTx(next)
primaryMempool.insertTx(next, true)
}()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type WrappedTx struct {
// IsBefore returns true if the WrappedTx is before the given WrappedTx
// this applies to EVM transactions only
func (wtx *WrappedTx) IsBefore(tx *WrappedTx) bool {
return wtx.evmNonce < tx.evmNonce || (wtx.evmNonce == tx.evmNonce && wtx.timestamp.Before(tx.timestamp))
return wtx.evmNonce < tx.evmNonce
}

func (wtx *WrappedTx) Size() int {
Expand Down
7 changes: 7 additions & 0 deletions proto/tendermint/abci/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ message ResponseDeliverTx {
repeated Event events = 7
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
string codespace = 8;
EvmTxInfo evm_tx_info = 9;
}

message ResponseEndBlock {
Expand Down Expand Up @@ -458,6 +459,7 @@ message ExecTxResult {
repeated Event events = 7
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
string codespace = 8;
EvmTxInfo evm_tx_info = 9;
}

// TxResult contains results of executing the transaction.
Expand Down Expand Up @@ -564,6 +566,11 @@ message Evidence {
int64 total_voting_power = 5;
}

message EvmTxInfo {
string senderAddress = 1;
uint64 nonce = 2;
}

//----------------------------------------
// State Sync Types

Expand Down
1 change: 0 additions & 1 deletion proto/tendermint/crypto/keys.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/mempool/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/p2p/conn.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bd9d427

Please sign in to comment.