Skip to content

Commit

Permalink
[EVM] Adjust locking for replacement (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders authored and udpatil committed Apr 16, 2024
1 parent 4b01444 commit dfcff32
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 26 deletions.
27 changes: 12 additions & 15 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,19 +665,11 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
txInfo.SenderID: {},
}

replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx)
if shouldDrop {
if txmp.isInMempool(wtx.tx) {
return nil
}

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

if replaced != nil {
txmp.removeTx(replaced, true, false, false)
}
if txmp.insertTx(wtx, replaced == nil) {
if txmp.insertTx(wtx) {
txmp.logger.Debug(
"inserted good transaction",
"priority", wtx.priority,
Expand Down Expand Up @@ -871,15 +863,20 @@ func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
return nil
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool {
if txmp.isInMempool(wtx.tx) {
func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
replacedTx, inserted := txmp.priorityIndex.PushTx(wtx)
if !inserted {
return false
}
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

txmp.txStore.SetTx(wtx)
if updatePriorityIndex {
txmp.priorityIndex.PushTx(wtx)
if replacedTx != nil {
txmp.removeTx(replacedTx, true, false, false)
}

txmp.txStore.SetTx(wtx)
txmp.heightIndex.Insert(wtx)
txmp.timestampIndex.Insert(wtx)

Expand Down
20 changes: 16 additions & 4 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,10 @@ func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx,
return nil, -1
}

func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
func (pq *TxPriorityQueue) tryReplacementUnsafe(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
if !tx.isEVM {
return nil, false
}
pq.mtx.Lock()
defer pq.mtx.Unlock()
queue, ok := pq.evmQueue[tx.evmAddress]
if ok && len(queue) > 0 {
existing, idx := pq.getTxWithSameNonceUnsafe(tx)
Expand Down Expand Up @@ -338,11 +336,25 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
//}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) (*WrappedTx, bool) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

replacedTx, shouldDrop := pq.tryReplacementUnsafe(tx)

// tx was not inserted, and nothing was replaced
if shouldDrop {
return nil, false
}

// tx replaced an existing transaction
if replacedTx != nil {
return replacedTx, true
}

// tx was not inserted yet, so insert it
pq.pushTxUnsafe(tx)
return nil, true
}

func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
Expand Down
17 changes: 12 additions & 5 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
{sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9},
{sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7},
},
expectedOutput: []int64{1, 2, 3},
expectedOutput: []int64{1, 3},
},
{
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
Expand Down Expand Up @@ -380,17 +380,23 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
expectedQueue []*WrappedTx
expectedHeap []*WrappedTx
}{
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
// non-evm transaction is inserted into empty queue
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: false}}, []*WrappedTx{{isEVM: false}}},
// evm transaction is inserted into empty queue
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}},
// evm transaction (new nonce) is inserted into queue with existing tx (lower nonce)
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, false, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
},
},
// evm transaction (new nonce) is not inserted because it's a duplicate nonce and same priority
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
Expand All @@ -400,6 +406,7 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
// evm transaction (new nonce) replaces the existing nonce transaction because its priority is higher
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
Expand All @@ -425,13 +432,13 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
for _, e := range test.existing {
pq.PushTx(e)
}
replaced, dropped := pq.TryReplacement(test.tx)
replaced, inserted := pq.PushTx(test.tx)
if test.expectedReplaced {
require.NotNil(t, replaced)
} else {
require.Nil(t, replaced)
}
require.Equal(t, test.expectedDropped, dropped)
require.Equal(t, test.expectedDropped, !inserted)
for i, q := range pq.evmQueue[test.tx.evmAddress] {
require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key())
require.Equal(t, test.expectedQueue[i].priority, q.priority)
Expand Down
4 changes: 2 additions & 2 deletions internal/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
secondaryReactor.observePanic = observePanic

firstTx := &WrappedTx{}
primaryMempool.insertTx(firstTx, true)
primaryMempool.insertTx(firstTx)

// run the router
rts.start(ctx, t)
Expand All @@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
primaryMempool.insertTx(next, true)
primaryMempool.insertTx(next)
}()
}

Expand Down

0 comments on commit dfcff32

Please sign in to comment.