Skip to content

Commit

Permalink
Merge pull request #205 from SiaFoundation/nate/support-resync
Browse files Browse the repository at this point in the history
fix: Fix panic when resyncing after consensus database is deleted
  • Loading branch information
n8maninger authored Dec 19, 2024
2 parents e4f5880 + d402a87 commit de61683
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 9 deletions.
48 changes: 47 additions & 1 deletion persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,52 @@ func (s *Store) SetIndexMode(mode wallet.IndexMode) error {
})
}

// ResetChainState deletes all blockchain state from the database.
func (s *Store) ResetChainState() error {
return s.transaction(func(tx *txn) error {
_, err := tx.Exec(`UPDATE sia_addresses SET siacoin_balance=$1, siafund_balance=0, immature_siacoin_balance=$1`, encode(types.ZeroCurrency))
if err != nil {
return fmt.Errorf("failed to reset sia addresses: %w", err)
}

_, err = tx.Exec(`DELETE FROM siacoin_elements`)
if err != nil {
return fmt.Errorf("failed to delete siacoin elements: %w", err)
}

_, err = tx.Exec(`DELETE FROM siafund_elements`)
if err != nil {
return fmt.Errorf("failed to delete siafund elements: %w", err)
}

_, err = tx.Exec(`DELETE FROM state_tree`)
if err != nil {
return fmt.Errorf("failed to delete state tree: %w", err)
}

_, err = tx.Exec(`DELETE FROM event_addresses`)
if err != nil {
return fmt.Errorf("failed to delete event addresses: %w", err)
}

_, err = tx.Exec(`DELETE FROM events`)
if err != nil {
return fmt.Errorf("failed to delete events: %w", err)
}

_, err = tx.Exec(`DELETE FROM chain_indices`)
if err != nil {
return fmt.Errorf("failed to delete chain indices: %w", err)
}

_, err = tx.Exec(`UPDATE global_settings SET last_indexed_height=0, last_indexed_id=$1, element_num_leaves=0`, encode(types.BlockID{}))
if err != nil {
return fmt.Errorf("failed to reset global settings: %w", err)
}
return nil
})
}

func getSiacoinStateElements(tx *txn) ([]stateElement, error) {
const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements`
rows, err := tx.Query(query)
Expand Down Expand Up @@ -1212,7 +1258,7 @@ RETURNING id, address_id, siafund_value`, index.Height, encode(index.ID))
func deleteOrphanedSiafundElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]uint64, error) {
rows, err := tx.Query(`DELETE FROM siafund_elements WHERE id IN (SELECT se.id FROM siafund_elements se
INNER JOIN chain_indices ci ON (ci.id=se.chain_index_id)
WHERE ci.height=$1 AND ci.block_id<>$2)
WHERE ci.height=$1 AND ci.block_id<>$2)
RETURNING id, address_id, siafund_value, spent_index_id IS NOT NULL`, index.Height, encode(index.ID))
if err != nil {
return nil, fmt.Errorf("failed to query siafund elements: %w", err)
Expand Down
24 changes: 22 additions & 2 deletions wallet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -54,6 +55,7 @@ type (
// A Store is a persistent store of wallet data.
Store interface {
UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error
ResetChainState() error

WalletUnconfirmedEvents(id ID, index types.ChainIndex, timestamp time.Time, v1 []types.Transaction, v2 []types.V2Transaction) (annotated []Event, err error)
WalletEvents(walletID ID, offset, limit int) ([]Event, error)
Expand Down Expand Up @@ -382,8 +384,26 @@ func NewManager(cm ChainManager, store Store, opts ...Option) (*Manager, error)
lastTip, err := store.LastCommittedIndex()
if err != nil {
log.Panic("failed to get last committed index", zap.Error(err))
} else if err := syncStore(ctx, store, cm, lastTip, m.syncBatchSize); err != nil && !errors.Is(err, context.Canceled) {
log.Panic("failed to sync store", zap.Error(err))
}
err = syncStore(ctx, store, cm, lastTip, m.syncBatchSize)
if err != nil {
switch {
case errors.Is(err, context.Canceled):
m.mu.Unlock()
return
case strings.Contains(err.Error(), "missing block at index"): // unfortunate, but not exposed by coreutils
log.Warn("missing block at index, resetting chain state", zap.Stringer("id", lastTip.ID), zap.Uint64("height", lastTip.Height))
if err := store.ResetChainState(); err != nil {
log.Panic("failed to reset wallet state", zap.Error(err))
}
// trigger resync
select {
case reorgChan <- struct{}{}:
default:
}
default:
log.Panic("failed to sync store", zap.Error(err))
}
}
m.mu.Unlock()
}
Expand Down
138 changes: 132 additions & 6 deletions wallet/wallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,13 @@ func TestWalletAddresses(t *testing.T) {
SpendPolicy: &spendPolicy,
Description: "hello, world",
}
err = db.AddWalletAddress(w.ID, addr)
err = wm.AddAddress(w.ID, addr)
if err != nil {
t.Fatal(err)
}

// Check that the address was added
addresses, err := db.WalletAddresses(w.ID)
addresses, err := wm.Addresses(w.ID)
if err != nil {
t.Fatal(err)
} else if len(addresses) != 1 {
Expand All @@ -600,12 +600,12 @@ func TestWalletAddresses(t *testing.T) {
addr.Description = "goodbye, world"
addr.Metadata = json.RawMessage(`{"foo": "bar"}`)

if err := db.AddWalletAddress(w.ID, addr); err != nil {
if err := wm.AddAddress(w.ID, addr); err != nil {
t.Fatal(err)
}

// Check that the address was added
addresses, err = db.WalletAddresses(w.ID)
addresses, err = wm.Addresses(w.ID)
if err != nil {
t.Fatal(err)
} else if len(addresses) != 1 {
Expand All @@ -621,13 +621,13 @@ func TestWalletAddresses(t *testing.T) {
}

// Remove the address
err = db.RemoveWalletAddress(w.ID, address)
err = wm.RemoveAddress(w.ID, address)
if err != nil {
t.Fatal(err)
}

// Check that the address was removed
addresses, err = db.WalletAddresses(w.ID)
addresses, err = wm.Addresses(w.ID)
if err != nil {
t.Fatal(err)
} else if len(addresses) != 0 {
Expand Down Expand Up @@ -3512,3 +3512,129 @@ func TestEventTypes(t *testing.T) {
assertEvent(t, types.Hash256(types.SiafundOutputID(sfe[0].ID).V2ClaimOutputID()), wallet.EventTypeSiafundClaim, claimValue, types.ZeroCurrency, cm.Tip().Height+144)
})
}

func TestReset(t *testing.T) {
log := zaptest.NewLogger(t)

pk := types.GeneratePrivateKey()
addr := types.StandardUnlockHash(pk.PublicKey())

network, genesisBlock := testutil.Network()
// send the siafunds to the owned address
genesisBlock.Transactions[0].SiafundOutputs[0].Address = addr

bdb, err := coreutils.OpenBoltChainDB(filepath.Join(t.TempDir(), "consensus.db"))
if err != nil {
t.Fatal(err)
}
defer bdb.Close()

store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock)
if err != nil {
t.Fatal(err)
}
cm1 := chain.NewManager(store, genesisState)

bdb2, err := coreutils.OpenBoltChainDB(filepath.Join(t.TempDir(), "consensus2.db"))
if err != nil {
t.Fatal(err)
}
defer bdb2.Close()
store2, genesisState2, err := chain.NewDBStore(bdb2, network, genesisBlock)
if err != nil {
t.Fatal(err)
}
cm2 := chain.NewManager(store2, genesisState2)

// mine blocks before starting the wallet manager
for i := 0; i < 25; i++ {
// blocks on the first chain manager go to the void
b1, ok := coreutils.MineBlock(cm1, types.VoidAddress, 15*time.Second)
if !ok {
t.Fatal("failed to mine block")
} else if err := cm1.AddBlocks([]types.Block{b1}); err != nil {
t.Fatal(err)
}

// blocks on the second one go to the primary address
b2, ok := coreutils.MineBlock(cm2, addr, 15*time.Second)
if !ok {
t.Fatal("failed to mine block")
} else if err := cm2.AddBlocks([]types.Block{b2}); err != nil {
t.Fatal(err)
}
}

db, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "walletd.sqlite3"), log.Named("sqlite3"))
if err != nil {
t.Fatal(err)
}
defer db.Close()

// wait for the manager to sync to the first chain
wm, err := wallet.NewManager(cm1, db, wallet.WithLogger(log.Named("wallet")), wallet.WithIndexMode(wallet.IndexModeFull))
if err != nil {
t.Fatal(err)
}
defer wm.Close()

waitForBlock(t, cm1, db)

assertBalance := func(t *testing.T, addr types.Address, siacoin, immature types.Currency, siafund uint64) {
t.Helper()

balance, err := db.AddressBalance(addr)
if err != nil {
t.Fatal(err)
}
switch {
case !balance.Siacoins.Equals(siacoin):
t.Fatalf("expected %v SC, got %v", siacoin, balance.Siacoins)
case !balance.ImmatureSiacoins.Equals(immature):
t.Fatalf("expected immature %v SC, got %v", siacoin, balance.Siacoins)
case balance.Siafunds != siafund:
t.Fatalf("expected %v siafunds, got %v", siafund, balance.Siafunds)
}
}

assertBalance(t, addr, types.ZeroCurrency, types.ZeroCurrency, 10000)

// close the manager
if err := wm.Close(); err != nil {
t.Fatal()
}

// calculate the expected balances
_, applied, err := cm2.UpdatesSince(types.ChainIndex{}, 1000)
if err != nil {
t.Fatal(err)
}

var siacoinElements []types.SiacoinElement
for _, cau := range applied {
cau.ForEachSiacoinElement(func(sce types.SiacoinElement, created, spent bool) {
if created && sce.SiacoinOutput.Address == addr {
siacoinElements = append(siacoinElements, sce)
}
})
}

var expectedSiacoins, expectedImmature types.Currency
for _, sce := range siacoinElements {
if sce.MaturityHeight > cm2.Tip().Height {
expectedImmature = expectedImmature.Add(sce.SiacoinOutput.Value)
} else {
expectedSiacoins = expectedSiacoins.Add(sce.SiacoinOutput.Value)
}
}

wm, err = wallet.NewManager(cm2, db, wallet.WithLogger(log.Named("wallet")), wallet.WithIndexMode(wallet.IndexModeFull))
if err != nil {
t.Fatal(err)
}
defer wm.Close()

waitForBlock(t, cm2, db)

assertBalance(t, addr, expectedSiacoins, expectedImmature, genesisState.SiafundCount())
}

0 comments on commit de61683

Please sign in to comment.