Skip to content

Commit

Permalink
Add heapIndex with safety check (#213)
Browse files Browse the repository at this point in the history
* add heapIndex with safety check

* cleanup

* comment out for perf test

* add back perf improvement

* fix nil test

* Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209)

ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim
> Transactions returned are not removed from the mempool transaction
> store or indexes.

However, they use a priority queue to accomplish the claim
> Transaction are retrieved in priority order.

This is accomplished by popping all items out of the whole heap, and
then pushing then back in sequentially. A copy of the heap cannot be
obtained otherwise. Both of the mentioned functions use a read-lock
(RLock) when doing this. This results in a potential scenario where
multiple executions of the ReapMax can be started in parallel, and
both would be popping items out of the priority queue.

In practice, this can be abused by executing the `unconfirmed_txs` RPC
call repeatedly. Based on our observations, running it multiple times
per millisecond results in multiple threads picking it up at the same
time. Such a scenario can be obtained via the WebSocket interface, and
spamming `unconfirmed_txs` calls there. The behavior that happens is a
`Panic in WSJSONRPC handler` when a queue item unexpectedly disappears
for `mempool.(*TxPriorityQueue).Swap`.
(`runtime error: index out of range [0] with length 0`)

This can additionally lead to a `CONSENSUS FAILURE!!!` if the race
condition occurs for `internal/consensus.(*State).finalizeCommit`
when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but
the ReapMax has already removed all elements from the underlying
heap. (`runtime error: index out of range [-1]`)

This commit switches the lock type to a write-lock (Lock) to ensure
no parallel modifications take place. This commit additionally updates
the tests to allow parallel execution of the func calls in testing,
as to prevent regressions (in case someone wants to downgrade the locks
without considering the implications from the underlying heap usage).

---------

Co-authored-by: Valters Jansons <[email protected]>
  • Loading branch information
2 people authored and udpatil committed Apr 16, 2024
1 parent 21aa1cb commit 2c74209
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx in
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
// safety check for race situation where heapIndex is out of range of txs
if tx.heapIndex >= 0 && tx.heapIndex < len(pq.txs) && pq.txs[tx.heapIndex].tx.Key() == tx.tx.Key() {
return tx.heapIndex, true
}

// heap index isn't trustable here, so attempt to find it
for i, t := range pq.txs {
if t.tx.Key() == tx.tx.Key() {
return i, true
Expand Down Expand Up @@ -443,7 +449,9 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
//
// NOTE: A caller should never call Push. Use PushTx instead.
func (pq *TxPriorityQueue) Push(x interface{}) {
n := len(pq.txs)
item := x.(*WrappedTx)
item.heapIndex = n
pq.txs = append(pq.txs, item)
}

Expand All @@ -454,7 +462,8 @@ func (pq *TxPriorityQueue) Pop() interface{} {
old := pq.txs
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
old[n-1] = nil // avoid memory leak
setHeapIndex(item, -1) // for safety
pq.txs = old[0 : n-1]
return item
}
Expand Down Expand Up @@ -483,4 +492,14 @@ func (pq *TxPriorityQueue) Less(i, j int) bool {
// Swap implements the Heap interface. It swaps two transactions in the queue.
func (pq *TxPriorityQueue) Swap(i, j int) {
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
setHeapIndex(pq.txs[i], i)
setHeapIndex(pq.txs[j], j)
}

func setHeapIndex(tx *WrappedTx, i int) {
// a removed tx can be nil
if tx == nil {
return
}
tx.heapIndex = i
}

0 comments on commit 2c74209

Please sign in to comment.