Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: Handle txpool conflicts properly #120

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 64 additions & 65 deletions chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (m *Manager) revalidatePool() {
// transactions, but that's expensive; this approach should work fine in
// practice.
if m.txpool.weight >= txpoolMaxWeight {
// sort txns fee without modifying the actual pool slice
// sort txns by fee without modifying the actual pool slice
type feeTxn struct {
index int
fees types.Currency
Expand Down Expand Up @@ -507,10 +507,16 @@ func (m *Manager) revalidatePool() {
return txnFees[i].index < txnFees[j].index
})
rem := m.txpool.txns[:0]
v2rem := m.txpool.v2txns[:0]
for _, ft := range txnFees {
rem = append(rem, m.txpool.txns[ft.index])
if !ft.v2 {
rem = append(rem, m.txpool.txns[ft.index])
} else {
v2rem = append(v2rem, m.txpool.v2txns[ft.index])
}
}
m.txpool.txns = rem
m.txpool.v2txns = v2rem
}

// remove and re-add all transactions
Expand Down Expand Up @@ -1001,44 +1007,56 @@ func (m *Manager) V2TransactionSet(basis types.ChainIndex, txn types.V2Transacti
return m.tipState.Index, append(parents, txn), nil
}

func (m *Manager) checkDupTxnSet(txns []types.Transaction, v2txns []types.V2Transaction) (types.Hash256, bool) {
func (m *Manager) checkTxnSet(txns []types.Transaction, v2txns []types.V2Transaction) (bool, error) {
allInPool := true
checkPool := func(txid types.TransactionID) {
checkPool := func(txid types.TransactionID) types.TransactionID {
if allInPool {
if _, ok := m.txpool.indices[txid]; !ok {
allInPool = false
}
}
return txid
n8maninger marked this conversation as resolved.
Show resolved Hide resolved
}
h := types.NewHasher()
for i, txn := range txns {
txid := txn.ID()
checkPool(txid)
h.E.WriteUint64(uint64(i))
txid.EncodeTo(h.E)
for _, txn := range txns {
checkPool(txn.ID()).EncodeTo(h.E)
}
for i, txn := range v2txns {
txid := txn.ID()
checkPool(txid)
h.E.WriteUint64(uint64(i))
txid.EncodeTo(h.E)
for _, txn := range v2txns {
checkPool(txn.ID()).EncodeTo(h.E)
}
setID := h.Sum()
n8maninger marked this conversation as resolved.
Show resolved Hide resolved
_, invalid := m.txpool.invalidTxnSets[setID]
return setID, allInPool || invalid
}
if err := m.txpool.invalidTxnSets[setID]; allInPool || err != nil {
return true, err
}

func (m *Manager) markBadTxnSet(setID types.Hash256, err error) error {
const maxInvalidTxnSets = 1000
if len(m.txpool.invalidTxnSets) >= maxInvalidTxnSets {
// forget a random entry
for id := range m.txpool.invalidTxnSets {
delete(m.txpool.invalidTxnSets, id)
break
// validate
markBadTxnSet := func(err error) error {
const maxInvalidTxnSets = 1000
if len(m.txpool.invalidTxnSets) >= maxInvalidTxnSets {
// forget a random entry
for id := range m.txpool.invalidTxnSets {
delete(m.txpool.invalidTxnSets, id)
break
}
}
m.txpool.invalidTxnSets[setID] = err
return err
}
m.txpool.invalidTxnSets[setID] = err
return err
ms := consensus.NewMidState(m.tipState)
for _, txn := range txns {
ts := m.store.SupplementTipTransaction(txn)
if err := consensus.ValidateTransaction(ms, txn, ts); err != nil {
return false, markBadTxnSet(fmt.Errorf("transaction %v is invalid: %w", txn.ID(), err))
}
ms.ApplyTransaction(txn, ts)
}
for _, txn := range v2txns {
if err := consensus.ValidateV2Transaction(ms, txn); err != nil {
return false, markBadTxnSet(fmt.Errorf("v2 transaction %v is invalid: %w", txn.ID(), err))
}
ms.ApplyV2Transaction(txn)
}
return false, nil
}

func (m *Manager) updateV2TransactionProofs(txns []types.V2Transaction, from, to types.ChainIndex) ([]types.V2Transaction, error) {
Expand Down Expand Up @@ -1145,32 +1163,25 @@ func (m *Manager) AddPoolTransactions(txns []types.Transaction) (known bool, err
defer m.mu.Unlock()
m.revalidatePool()

setID, known := m.checkDupTxnSet(txns, nil)
if known {
return true, m.txpool.invalidTxnSets[setID]
}

// validate as a standalone set
ms := consensus.NewMidState(m.tipState)
for _, txn := range txns {
ts := m.store.SupplementTipTransaction(txn)
if err := consensus.ValidateTransaction(ms, txn, ts); err != nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("transaction %v is invalid: %w", txn.ID(), err))
}
ms.ApplyTransaction(txn, ts)
if known, err := m.checkTxnSet(txns, nil); known || err != nil {
return known, err
}

for _, txn := range txns {
txid := txn.ID()
if _, ok := m.txpool.indices[txid]; ok {
continue // skip transactions already in pool
continue // skip transactions already in the pool
}
m.txpool.ms.ApplyTransaction(txn, m.store.SupplementTipTransaction(txn))
ts := m.store.SupplementTipTransaction(txn)
if err := consensus.ValidateTransaction(m.txpool.ms, txn, ts); err != nil {
m.txpool.ms = nil // force revalidation next time the pool is queried
return false, fmt.Errorf("transaction %v conflicts with pool: %w", txid, err)
}
m.txpool.ms.ApplyTransaction(txn, ts)
m.txpool.indices[txid] = len(m.txpool.txns)
m.txpool.txns = append(m.txpool.txns, txn)
m.txpool.weight += m.tipState.TransactionWeight(txn)
}

// invalidate caches
m.txpool.medianFee = nil
m.txpool.parentMap = nil
Expand All @@ -1188,7 +1199,6 @@ func (m *Manager) UpdateV2TransactionSet(txns []types.V2Transaction, from, to ty
if from == to {
return txns, nil
}

m.mu.Lock()
defer m.mu.Unlock()
return m.updateV2TransactionProofs(txns, from, to)
Expand All @@ -1213,49 +1223,38 @@ func (m *Manager) AddV2PoolTransactions(basis types.ChainIndex, txns []types.V2T
defer m.mu.Unlock()
m.revalidatePool()

setID, known := m.checkDupTxnSet(nil, txns)
if known {
return true, m.txpool.invalidTxnSets[setID]
}

// take ownership
// take ownership of Merkle proofs, and update them to the current tip
txns = append([]types.V2Transaction(nil), txns...)
for i := range txns {
txns[i] = txns[i].DeepCopy()
}

// update the transaction set to the current tip
txns, err := m.updateV2TransactionProofs(txns, basis, m.tipState.Index)
if err != nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("failed to update set basis: %w", err))
} else if len(txns) == 0 {
return true, nil
return false, fmt.Errorf("failed to update set basis: %w", err)
}

// validate as a standalone set
ms := consensus.NewMidState(m.tipState)
for _, txn := range txns {
if err := consensus.ValidateV2Transaction(ms, txn); err != nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("transaction %v is invalid: %w", txn.ID(), err))
}
ms.ApplyV2Transaction(txn)
if known, err := m.checkTxnSet(nil, txns); known || err != nil {
return known, err
}

for _, txn := range txns {
txid := txn.ID()
if _, ok := m.txpool.indices[txid]; ok {
continue // skip transactions already in pool
continue // skip transactions already in the pool
}
if err := consensus.ValidateV2Transaction(m.txpool.ms, txn); err != nil {
m.txpool.ms = nil // force revalidation next time the pool is queried
return false, fmt.Errorf("transaction %v conflicts with pool: %w", txid, err)
}
m.txpool.ms.ApplyV2Transaction(txn)
m.txpool.indices[txid] = len(m.txpool.v2txns)
m.txpool.indices[txid] = len(m.txpool.txns)
m.txpool.v2txns = append(m.txpool.v2txns, txn)
m.txpool.weight += m.tipState.V2TransactionWeight(txn)
}

// invalidate caches
m.txpool.medianFee = nil
m.txpool.parentMap = nil
return
return false, nil
}

// NewManager returns a Manager initialized with the provided Store and State.
Expand Down
16 changes: 11 additions & 5 deletions wallet/wallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ func TestSingleAddressWalletEventTypes(t *testing.T) {
})
}

func TestV2TPoolRace(t *testing.T) {
func TestV2TxPoolRace(t *testing.T) {
// create wallet store
pk := types.GeneratePrivateKey()
ws := testutil.NewEphemeralWalletStore()
Expand Down Expand Up @@ -1860,11 +1860,17 @@ func TestV2TPoolRace(t *testing.T) {
// output in the spend transaction invalid unless it is updated.
mineAndSync(t, cm, ws, w, types.VoidAddress, 1)

// broadcast the transaction set including the already confirmed setup
// transaction. This seems unnecessary, but it's a fairly common occurrence
// when passing transaction sets using unconfirmed outputs between a renter
// and host. If the transaction set is not updated correctly, it will fail.
// even though the setup transaction has been confirmed, and the spend
// transaction is outdated, we can still add them without error: internally,
// AddV2PoolTransactions will remove any confirmed transactions, replace any
// ephemeral outputs, and update the Merkle proofs of all elements.
if _, err := cm.AddV2PoolTransactions(basis, []types.V2Transaction{setupTxn, spendTxn}); err != nil {
t.Fatal(err)
}
// updating the transaction shouldn't change its ID
if spendTxn, ok := cm.V2PoolTransaction(spendTxn.ID()); !ok {
t.Fatal("expected spend transaction to be in pool")
} else if spendTxn.SiacoinInputs[0].Parent.StateElement.LeafIndex == types.UnassignedLeafIndex {
t.Fatal("expected ephemeral output to be replaced")
}
}
Loading