diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 2129eab..25973d6 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -258,6 +258,52 @@ func (s *Store) SetIndexMode(mode wallet.IndexMode) error { }) } +// ResetChainState deletes all blockchain state from the database. +func (s *Store) ResetChainState() error { + return s.transaction(func(tx *txn) error { + _, err := tx.Exec(`UPDATE sia_addresses SET siacoin_balance=$1, siafund_balance=0, immature_siacoin_balance=$1`, encode(types.ZeroCurrency)) + if err != nil { + return fmt.Errorf("failed to reset sia addresses: %w", err) + } + + _, err = tx.Exec(`DELETE FROM siacoin_elements`) + if err != nil { + return fmt.Errorf("failed to delete siacoin elements: %w", err) + } + + _, err = tx.Exec(`DELETE FROM siafund_elements`) + if err != nil { + return fmt.Errorf("failed to delete siafund elements: %w", err) + } + + _, err = tx.Exec(`DELETE FROM state_tree`) + if err != nil { + return fmt.Errorf("failed to delete state tree: %w", err) + } + + _, err = tx.Exec(`DELETE FROM event_addresses`) + if err != nil { + return fmt.Errorf("failed to delete event addresses: %w", err) + } + + _, err = tx.Exec(`DELETE FROM events`) + if err != nil { + return fmt.Errorf("failed to delete events: %w", err) + } + + _, err = tx.Exec(`DELETE FROM chain_indices`) + if err != nil { + return fmt.Errorf("failed to delete chain indices: %w", err) + } + + _, err = tx.Exec(`UPDATE global_settings SET last_indexed_height=0, last_indexed_id=$1, element_num_leaves=0`, encode(types.BlockID{})) + if err != nil { + return fmt.Errorf("failed to reset global settings: %w", err) + } + return nil + }) +} + func getSiacoinStateElements(tx *txn) ([]stateElement, error) { const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements` rows, err := tx.Query(query) @@ -1212,7 +1258,7 @@ RETURNING id, address_id, siafund_value`, index.Height, encode(index.ID)) func deleteOrphanedSiafundElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]uint64, error) { rows, err := tx.Query(`DELETE FROM siafund_elements WHERE id IN (SELECT se.id FROM siafund_elements se INNER JOIN chain_indices ci ON (ci.id=se.chain_index_id) -WHERE ci.height=$1 AND ci.block_id<>$2) +WHERE ci.height=$1 AND ci.block_id<>$2) RETURNING id, address_id, siafund_value, spent_index_id IS NOT NULL`, index.Height, encode(index.ID)) if err != nil { return nil, fmt.Errorf("failed to query siafund elements: %w", err) diff --git a/wallet/manager.go b/wallet/manager.go index dcda2ce..3c5b7ac 100644 --- a/wallet/manager.go +++ b/wallet/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -54,6 +55,7 @@ type ( // A Store is a persistent store of wallet data. Store interface { UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error + ResetChainState() error WalletUnconfirmedEvents(id ID, index types.ChainIndex, timestamp time.Time, v1 []types.Transaction, v2 []types.V2Transaction) (annotated []Event, err error) WalletEvents(walletID ID, offset, limit int) ([]Event, error) @@ -382,8 +384,26 @@ func NewManager(cm ChainManager, store Store, opts ...Option) (*Manager, error) lastTip, err := store.LastCommittedIndex() if err != nil { log.Panic("failed to get last committed index", zap.Error(err)) - } else if err := syncStore(ctx, store, cm, lastTip, m.syncBatchSize); err != nil && !errors.Is(err, context.Canceled) { - log.Panic("failed to sync store", zap.Error(err)) + } + err = syncStore(ctx, store, cm, lastTip, m.syncBatchSize) + if err != nil { + switch { + case errors.Is(err, context.Canceled): + m.mu.Unlock() + return + case strings.Contains(err.Error(), "missing block at index"): // unfortunate, but not exposed by coreutils + log.Warn("missing block at index, resetting chain state", zap.Stringer("id", lastTip.ID), zap.Uint64("height", lastTip.Height)) + if err := store.ResetChainState(); err != nil { + log.Panic("failed to reset wallet state", zap.Error(err)) + } + // trigger resync + select { + case reorgChan <- struct{}{}: + default: + } + default: + log.Panic("failed to sync store", zap.Error(err)) + } } m.mu.Unlock() } diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index bdd6406..d672093 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -577,13 +577,13 @@ func TestWalletAddresses(t *testing.T) { SpendPolicy: &spendPolicy, Description: "hello, world", } - err = db.AddWalletAddress(w.ID, addr) + err = wm.AddAddress(w.ID, addr) if err != nil { t.Fatal(err) } // Check that the address was added - addresses, err := db.WalletAddresses(w.ID) + addresses, err := wm.Addresses(w.ID) if err != nil { t.Fatal(err) } else if len(addresses) != 1 { @@ -600,12 +600,12 @@ func TestWalletAddresses(t *testing.T) { addr.Description = "goodbye, world" addr.Metadata = json.RawMessage(`{"foo": "bar"}`) - if err := db.AddWalletAddress(w.ID, addr); err != nil { + if err := wm.AddAddress(w.ID, addr); err != nil { t.Fatal(err) } // Check that the address was added - addresses, err = db.WalletAddresses(w.ID) + addresses, err = wm.Addresses(w.ID) if err != nil { t.Fatal(err) } else if len(addresses) != 1 { @@ -621,13 +621,13 @@ func TestWalletAddresses(t *testing.T) { } // Remove the address - err = db.RemoveWalletAddress(w.ID, address) + err = wm.RemoveAddress(w.ID, address) if err != nil { t.Fatal(err) } // Check that the address was removed - addresses, err = db.WalletAddresses(w.ID) + addresses, err = wm.Addresses(w.ID) if err != nil { t.Fatal(err) } else if len(addresses) != 0 { @@ -3512,3 +3512,129 @@ func TestEventTypes(t *testing.T) { assertEvent(t, types.Hash256(types.SiafundOutputID(sfe[0].ID).V2ClaimOutputID()), wallet.EventTypeSiafundClaim, claimValue, types.ZeroCurrency, cm.Tip().Height+144) }) } + +func TestReset(t *testing.T) { + log := zaptest.NewLogger(t) + + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + network, genesisBlock := testutil.Network() + // send the siafunds to the owned address + genesisBlock.Transactions[0].SiafundOutputs[0].Address = addr + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(t.TempDir(), "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + cm1 := chain.NewManager(store, genesisState) + + bdb2, err := coreutils.OpenBoltChainDB(filepath.Join(t.TempDir(), "consensus2.db")) + if err != nil { + t.Fatal(err) + } + defer bdb2.Close() + store2, genesisState2, err := chain.NewDBStore(bdb2, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + cm2 := chain.NewManager(store2, genesisState2) + + // mine blocks before starting the wallet manager + for i := 0; i < 25; i++ { + // blocks on the first chain manager go to the void + b1, ok := coreutils.MineBlock(cm1, types.VoidAddress, 15*time.Second) + if !ok { + t.Fatal("failed to mine block") + } else if err := cm1.AddBlocks([]types.Block{b1}); err != nil { + t.Fatal(err) + } + + // blocks on the second one go to the primary address + b2, ok := coreutils.MineBlock(cm2, addr, 15*time.Second) + if !ok { + t.Fatal("failed to mine block") + } else if err := cm2.AddBlocks([]types.Block{b2}); err != nil { + t.Fatal(err) + } + } + + db, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // wait for the manager to sync to the first chain + wm, err := wallet.NewManager(cm1, db, wallet.WithLogger(log.Named("wallet")), wallet.WithIndexMode(wallet.IndexModeFull)) + if err != nil { + t.Fatal(err) + } + defer wm.Close() + + waitForBlock(t, cm1, db) + + assertBalance := func(t *testing.T, addr types.Address, siacoin, immature types.Currency, siafund uint64) { + t.Helper() + + balance, err := db.AddressBalance(addr) + if err != nil { + t.Fatal(err) + } + switch { + case !balance.Siacoins.Equals(siacoin): + t.Fatalf("expected %v SC, got %v", siacoin, balance.Siacoins) + case !balance.ImmatureSiacoins.Equals(immature): + t.Fatalf("expected immature %v SC, got %v", siacoin, balance.Siacoins) + case balance.Siafunds != siafund: + t.Fatalf("expected %v siafunds, got %v", siafund, balance.Siafunds) + } + } + + assertBalance(t, addr, types.ZeroCurrency, types.ZeroCurrency, 10000) + + // close the manager + if err := wm.Close(); err != nil { + t.Fatal() + } + + // calculate the expected balances + _, applied, err := cm2.UpdatesSince(types.ChainIndex{}, 1000) + if err != nil { + t.Fatal(err) + } + + var siacoinElements []types.SiacoinElement + for _, cau := range applied { + cau.ForEachSiacoinElement(func(sce types.SiacoinElement, created, spent bool) { + if created && sce.SiacoinOutput.Address == addr { + siacoinElements = append(siacoinElements, sce) + } + }) + } + + var expectedSiacoins, expectedImmature types.Currency + for _, sce := range siacoinElements { + if sce.MaturityHeight > cm2.Tip().Height { + expectedImmature = expectedImmature.Add(sce.SiacoinOutput.Value) + } else { + expectedSiacoins = expectedSiacoins.Add(sce.SiacoinOutput.Value) + } + } + + wm, err = wallet.NewManager(cm2, db, wallet.WithLogger(log.Named("wallet")), wallet.WithIndexMode(wallet.IndexModeFull)) + if err != nil { + t.Fatal(err) + } + defer wm.Close() + + waitForBlock(t, cm2, db) + + assertBalance(t, addr, expectedSiacoins, expectedImmature, genesisState.SiafundCount()) +}