Skip to content

Commit

Permalink
chain,wallet: fix tpool race
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Aug 23, 2024
1 parent 1c4c895 commit 654082d
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 92 deletions.
163 changes: 121 additions & 42 deletions chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,13 +928,23 @@ func (m *Manager) UnconfirmedParents(txn types.Transaction) []types.Transaction
return parents
}

// V2UnconfirmedParents returns the v2 transactions in the txpool that are referenced
// by txn.
func (m *Manager) V2UnconfirmedParents(txn types.V2Transaction) []types.V2Transaction {
// V2TransactionSet returns the full transaction set and basis necessary for
// broadcasting a transaction. If the provided basis does not match the current
// tip the transaction will be updated. The transaction set includes the parents
// and the transaction itself in an order valid for broadcasting.
func (m *Manager) V2TransactionSet(basis types.ChainIndex, txn types.V2Transaction) (types.ChainIndex, []types.V2Transaction, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.revalidatePool()

// update the transaction's basis to match tip
_, txns, err := m.updateV2TransactionSet(basis, []types.V2Transaction{txn})
if err != nil {
return types.ChainIndex{}, nil, fmt.Errorf("failed to update transaction set basis: %w", err)
}
txn = txns[0]

// get the transaction's parents
parentMap := m.computeParentMap()
var parents []types.V2Transaction
seen := make(map[int]bool)
Expand Down Expand Up @@ -975,7 +985,7 @@ func (m *Manager) V2UnconfirmedParents(txn types.V2Transaction) []types.V2Transa
j := len(parents) - 1 - i
parents[i], parents[j] = parents[j], parents[i]
}
return parents
return m.tipState.Index, append(parents, txn), nil
}

func (m *Manager) checkDupTxnSet(txns []types.Transaction, v2txns []types.V2Transaction) (types.Hash256, bool) {
Expand Down Expand Up @@ -1063,6 +1073,109 @@ func (m *Manager) AddPoolTransactions(txns []types.Transaction) (known bool, err
return
}

// updateV2TransactionSet updates the basis of a transaction set to the current
// tip. If the basis is already the tip, the transaction set is returned as-is.
// Any transactions that were confirmed are removed from the set. Any ephemeral
// state elements that were created by an update are updated.
//
// If it is undesirable to modify the transaction set, deep-copy it
// before calling this method.
func (m *Manager) updateV2TransactionSet(basis types.ChainIndex, txns []types.V2Transaction) (types.ChainIndex, []types.V2Transaction, error) {
if basis == m.tipState.Index {
return basis, txns, nil
}

// bring txns up-to-date
revert, apply, err := m.reorgPath(basis, m.tipState.Index)
if err != nil {
return types.ChainIndex{}, nil, fmt.Errorf("couldn't determine reorg path from %v to %v: %w", basis, m.tipState.Index, err)
} else if len(revert)+len(apply) > 144 {
return types.ChainIndex{}, nil, fmt.Errorf("reorg path from %v to %v is too long (-%v +%v)", basis, m.tipState.Index, len(revert), len(apply))
}
for _, index := range revert {
b, _, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return types.ChainIndex{}, nil, fmt.Errorf("missing reverted block at index %v", index)
} else if b.V2 == nil {
return types.ChainIndex{}, nil, fmt.Errorf("reorg path from %v to %v contains a non-v2 block (%v)", basis, m.tipState.Index, index)
}
// NOTE: since we are post-hardfork, we don't need a v1 supplement
cru := consensus.RevertBlock(cs, b, consensus.V1BlockSupplement{})
for i := range txns {
if !updateTxnProofs(&txns[i], cru.UpdateElementProof, cs.Elements.NumLeaves) {
return types.ChainIndex{}, nil, fmt.Errorf("transaction %v references element that does not exist in our chain", txns[i].ID())
}
}
}

for _, index := range apply {
b, _, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return types.ChainIndex{}, nil, fmt.Errorf("missing applied block at index %v", index)
} else if b.V2 == nil {
return types.ChainIndex{}, nil, fmt.Errorf("reorg path from %v to %v contains a non-v2 block (%v)", basis, m.tipState.Index, index)
}
// NOTE: since we are post-hardfork, we don't need a v1 supplement or ancestorTimestamp
cs, cau := consensus.ApplyBlock(cs, b, consensus.V1BlockSupplement{}, time.Time{})

// get the transactions that were confirmed in this block
confirmedTxns := make(map[types.TransactionID]bool)
for _, txn := range b.V2Transactions() {
confirmedTxns[txn.ID()] = true
}
confirmedStateElements := make(map[types.Hash256]types.StateElement)
cau.ForEachSiacoinElement(func(sce types.SiacoinElement, created, spent bool) {
if created {
confirmedStateElements[sce.ID] = sce.StateElement
}
})
cau.ForEachSiafundElement(func(sfe types.SiafundElement, created, spent bool) {
if created {
confirmedStateElements[sfe.ID] = sfe.StateElement
}
})

rem := txns[:0]
for i := range txns {
if confirmedTxns[txns[i].ID()] {
// remove any transactions that were confirmed in this block
continue
}
rem = append(rem, txns[i])

// update the state elements for any confirmed ephemeral elements
for j := range txns[i].SiacoinInputs {
if txns[i].SiacoinInputs[j].Parent.LeafIndex != types.UnassignedLeafIndex {
continue
}
se, ok := confirmedStateElements[types.Hash256(txns[i].SiacoinInputs[j].Parent.ID)]
if !ok {
continue
}
txns[i].SiacoinInputs[j].Parent.StateElement = se
}

// update the state elements for any confirmed ephemeral elements
for j := range txns[i].SiafundInputs {
if txns[i].SiafundInputs[j].Parent.LeafIndex != types.UnassignedLeafIndex {
continue
}
se, ok := confirmedStateElements[types.Hash256(txns[i].SiafundInputs[j].Parent.ID)]
if !ok {
continue
}
txns[i].SiafundInputs[j].Parent.StateElement = se
}

// NOTE: all elements guaranteed to exist from here on, so no
// need to check this return value
updateTxnProofs(&rem[len(rem)-1], cau.UpdateElementProof, cs.Elements.NumLeaves)
}
txns = rem
}
return m.tipState.Index, txns, nil
}

// AddV2PoolTransactions validates a transaction set and adds it to the txpool.
// If any transaction references an element (SiacoinOutput, SiafundOutput, or
// FileContract) not present in the blockchain, that element must be created by
Expand Down Expand Up @@ -1093,44 +1206,10 @@ func (m *Manager) AddV2PoolTransactions(basis types.ChainIndex, txns []types.V2T
txns[i] = txns[i].DeepCopy()
}

if basis != m.tipState.Index {
// bring txns up-to-date
revert, apply, err := m.reorgPath(basis, m.tipState.Index)
if err != nil {
return false, fmt.Errorf("couldn't determine reorg path from %v to %v: %w", basis, m.tipState.Index, err)
} else if len(revert)+len(apply) > 144 {
return false, fmt.Errorf("reorg path from %v to %v is too long (-%v +%v)", basis, m.tipState.Index, len(revert), len(apply))
}
for _, index := range revert {
b, _, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return false, fmt.Errorf("missing reverted block at index %v", index)
} else if b.V2 == nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("reorg path from %v to %v contains a non-v2 block (%v)", basis, m.tipState.Index, index))
}
// NOTE: since we are post-hardfork, we don't need a v1 supplement
cru := consensus.RevertBlock(cs, b, consensus.V1BlockSupplement{})
for i := range txns {
if !updateTxnProofs(&txns[i], cru.UpdateElementProof, cs.Elements.NumLeaves) {
return false, m.markBadTxnSet(setID, fmt.Errorf("transaction %v references element that does not exist in our chain", txns[i].ID()))
}
}
}
for _, index := range apply {
b, _, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return false, fmt.Errorf("missing applied block at index %v", index)
} else if b.V2 == nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("reorg path from %v to %v contains a non-v2 block (%v)", basis, m.tipState.Index, index))
}
// NOTE: since we are post-hardfork, we don't need a v1 supplement or ancestorTimestamp
cs, cau := consensus.ApplyBlock(cs, b, consensus.V1BlockSupplement{}, time.Time{})
for i := range txns {
// NOTE: all elements guaranteed to exist from here on, so no
// need to check this return value
updateTxnProofs(&txns[i], cau.UpdateElementProof, cs.Elements.NumLeaves)
}
}
// update the transaction set to the current tip
_, txns, err := m.updateV2TransactionSet(basis, txns)
if err != nil {
return false, m.markBadTxnSet(setID, fmt.Errorf("failed to update set basis: %w", err))
}

// validate as a standalone set
Expand Down
16 changes: 13 additions & 3 deletions wallet/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,15 +371,25 @@ func (sw *SingleAddressWallet) UpdateChainState(tx UpdateTx, reverted []chain.Re
ID: cru.Block.ID(),
Height: cru.State.Index.Height + 1,
}
if err := revertChainUpdate(tx, revertedIndex, sw.addr, cru); err != nil {
return err
sw.mu.Lock()
err := revertChainUpdate(tx, revertedIndex, sw.addr, cru)
if err != nil {
sw.mu.Unlock()
return fmt.Errorf("failed to revert chain update %q: %w", cru.State.Index, err)
}
sw.tip = cru.State.Index
sw.mu.Unlock()
}

for _, cau := range applied {
if err := applyChainState(tx, sw.addr, cau); err != nil {
sw.mu.Lock()
err := applyChainState(tx, sw.addr, cau)
if err != nil {
sw.mu.Unlock()
return fmt.Errorf("failed to apply chain update %q: %w", cau.State.Index, err)
}
sw.tip = cau.State.Index
sw.mu.Unlock()
}
return nil
}
37 changes: 22 additions & 15 deletions wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ type (

cfg config

mu sync.Mutex // protects the following fields
mu sync.Mutex // protects the following fields
tip types.ChainIndex
// locked is a set of siacoin output IDs locked by FundTransaction. They
// will be released either by calling Release for unused transactions or
// being confirmed in a block.
Expand All @@ -90,7 +91,6 @@ var ErrDifferentSeed = errors.New("seed differs from wallet seed")

// Close closes the wallet
func (sw *SingleAddressWallet) Close() error {
// TODO: remove subscription??
return nil
}

Expand Down Expand Up @@ -413,19 +413,18 @@ func (sw *SingleAddressWallet) SignTransaction(txn *types.Transaction, toSign []
// will not be available to future calls to FundTransaction unless ReleaseInputs
// is called.
//
// The returned consensus state should be used to calculate the input signature
// hash and as the basis for AddV2PoolTransactions.
func (sw *SingleAddressWallet) FundV2Transaction(txn *types.V2Transaction, amount types.Currency, useUnconfirmed bool) (consensus.State, []int, error) {
if amount.IsZero() {
return sw.cm.TipState(), nil, nil
}

// The returned index should be used as the basis for AddV2PoolTransactions.
func (sw *SingleAddressWallet) FundV2Transaction(txn *types.V2Transaction, amount types.Currency, useUnconfirmed bool) (types.ChainIndex, []int, error) {
sw.mu.Lock()
defer sw.mu.Unlock()

if amount.IsZero() {
return sw.tip, nil, nil
}

selected, inputSum, err := sw.selectUTXOs(amount, len(txn.SiacoinInputs), useUnconfirmed)
if err != nil {
return consensus.State{}, nil, err
return types.ChainIndex{}, nil, err
}

// add a change output if necessary
Expand All @@ -445,11 +444,11 @@ func (sw *SingleAddressWallet) FundV2Transaction(txn *types.V2Transaction, amoun
sw.locked[sce.ID] = time.Now().Add(sw.cfg.ReservationDuration)
}

return sw.cm.TipState(), toSign, nil
return sw.tip, toSign, nil
}

// SignV2Inputs adds a signature to each of the specified siacoin inputs.
func (sw *SingleAddressWallet) SignV2Inputs(state consensus.State, txn *types.V2Transaction, toSign []int) {
func (sw *SingleAddressWallet) SignV2Inputs(txn *types.V2Transaction, toSign []int) {
if len(toSign) == 0 {
return
}
Expand All @@ -458,7 +457,7 @@ func (sw *SingleAddressWallet) SignV2Inputs(state consensus.State, txn *types.V2
defer sw.mu.Unlock()

policy := sw.SpendPolicy()
sigHash := state.InputSigHash(*txn)
sigHash := sw.cm.TipState().InputSigHash(*txn)
for _, i := range toSign {
txn.SiacoinInputs[i].SatisfiedPolicy = types.SatisfiedPolicy{
Policy: policy,
Expand All @@ -468,8 +467,10 @@ func (sw *SingleAddressWallet) SignV2Inputs(state consensus.State, txn *types.V2
}

// Tip returns the block height the wallet has scanned to.
func (sw *SingleAddressWallet) Tip() (types.ChainIndex, error) {
return sw.store.Tip()
func (sw *SingleAddressWallet) Tip() types.ChainIndex {
sw.mu.Lock()
defer sw.mu.Unlock()
return sw.tip
}

// SpendPolicy returns the wallet's default spend policy.
Expand Down Expand Up @@ -922,6 +923,11 @@ func NewSingleAddressWallet(priv types.PrivateKey, cm ChainManager, store Single
opt(&cfg)
}

tip, err := store.Tip()
if err != nil {
return nil, fmt.Errorf("failed to get wallet tip: %w", err)
}

sw := &SingleAddressWallet{
priv: priv,

Expand All @@ -932,6 +938,7 @@ func NewSingleAddressWallet(priv types.PrivateKey, cm ChainManager, store Single
log: cfg.Log,

addr: types.StandardUnlockHash(priv.PublicKey()),
tip: tip,
locked: make(map[types.Hash256]time.Time),
}
return sw, nil
Expand Down
Loading

0 comments on commit 654082d

Please sign in to comment.