diff --git a/api/api_test.go b/api/api_test.go index 1fac5d1..a5e59ea 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -1021,6 +1021,8 @@ func TestP2P(t *testing.T) { } checkBalances := func(p, s types.Currency) { t.Helper() + waitForBlock(t, cm1, store1) + waitForBlock(t, cm2, store2) if primaryBalance, err := primary.Balance(); err != nil { t.Fatal(err) } else if !primaryBalance.Siacoins.Equals(p) { @@ -1084,7 +1086,6 @@ func TestP2P(t *testing.T) { return err } checkBalances(pbal, sbal) - waitForBlock(t, cm1, store1) return nil } sendV2 := func() error { @@ -1135,7 +1136,6 @@ func TestP2P(t *testing.T) { } else if err := addBlock(); err != nil { return err } - waitForBlock(t, cm1, store1) checkBalances(pbal, sbal) return nil } diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 427a067..799c3a1 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -10,7 +10,6 @@ import ( "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" "go.sia.tech/walletd/wallet" - "go.uber.org/zap" ) type updateTx struct { @@ -573,49 +572,16 @@ func (ut *updateTx) RevertEvents(index types.ChainIndex) error { } // ProcessChainApplyUpdate implements chain.Subscriber -func (s *Store) ProcessChainApplyUpdate(cau chain.ApplyUpdate) error { - s.updates = append(s.updates, cau) - log := s.log.Named("ProcessChainApplyUpdate").With(zap.Stringer("index", cau.State.Index)) - log.Debug("received update") - log.Debug("committing updates", zap.Int("n", len(s.updates))) +func (s *Store) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { return s.transaction(func(tx *txn) error { utx := &updateTx{ tx: tx, relevantAddresses: make(map[types.Address]bool), } - if err := wallet.ApplyChainUpdates(utx, s.updates); err != nil { - return fmt.Errorf("failed to apply updates: %w", err) - } else if err := setLastCommittedIndex(tx, cau.State.Index); err != nil { - return fmt.Errorf("failed to set last committed index: %w", err) - } - s.updates = nil - return nil - }) -} - -// ProcessChainRevertUpdate implements chain.Subscriber -func (s *Store) ProcessChainRevertUpdate(cru chain.RevertUpdate) error { - log := s.log.Named("ProcessChainRevertUpdate").With(zap.Stringer("index", cru.State.Index)) - - // update hasn't been committed yet - if len(s.updates) > 0 && s.updates[len(s.updates)-1].Block.ID() == cru.Block.ID() { - log.Debug("removed uncommitted update") - s.updates = s.updates[:len(s.updates)-1] - return nil - } - - log.Debug("reverting update") - // update has been committed, revert it - return s.transaction(func(tx *txn) error { - utx := &updateTx{ - tx: tx, - relevantAddresses: make(map[types.Address]bool), - } - - if err := wallet.RevertChainUpdate(utx, cru); err != nil { - return fmt.Errorf("failed to revert update: %w", err) - } else if err := setLastCommittedIndex(tx, cru.State.Index); err != nil { + if err := wallet.UpdateChainState(utx, reverted, applied); err != nil { + return fmt.Errorf("failed to update chain state: %w", err) + } else if err := setLastCommittedIndex(tx, applied[len(applied)-1].State.Index); err != nil { return fmt.Errorf("failed to set last committed index: %w", err) } return nil diff --git a/persist/sqlite/consensus_test.go b/persist/sqlite/consensus_test.go index fafd567..eb2ec7b 100644 --- a/persist/sqlite/consensus_test.go +++ b/persist/sqlite/consensus_test.go @@ -85,19 +85,10 @@ func syncDB(t *testing.T, db *sqlite.Store, cm *chain.Manager) { crus, caus, err := cm.UpdatesSince(index, 1000) if err != nil { t.Fatal(err) + } else if err := db.UpdateChainState(crus, caus); err != nil { + t.Fatal(err) } - for _, cru := range crus { - if err := db.ProcessChainRevertUpdate(cru); err != nil { - t.Fatal("failed to process revert update:", err) - } - index = cru.State.Index - } - for _, cau := range caus { - if err := db.ProcessChainApplyUpdate(cau); err != nil { - t.Fatal("failed to process apply update:", err) - } - index = cau.State.Index - } + index = caus[len(caus)-1].State.Index } } diff --git a/persist/sqlite/store.go b/persist/sqlite/store.go index dff4ed9..39f03fe 100644 --- a/persist/sqlite/store.go +++ b/persist/sqlite/store.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "go.sia.tech/coreutils/chain" "go.uber.org/zap" "lukechampine.com/frand" ) @@ -19,8 +18,6 @@ type ( Store struct { db *sql.DB log *zap.Logger - - updates []chain.ApplyUpdate } ) diff --git a/wallet/manager.go b/wallet/manager.go index 7720f21..308c519 100644 --- a/wallet/manager.go +++ b/wallet/manager.go @@ -23,8 +23,7 @@ type ( // A Store is a persistent store of wallet data. Store interface { - ProcessChainApplyUpdate(cau chain.ApplyUpdate) error - ProcessChainRevertUpdate(cru chain.RevertUpdate) error + UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error WalletEvents(walletID ID, offset, limit int) ([]Event, error) AddWallet(Wallet) (Wallet, error) @@ -172,19 +171,10 @@ func syncStore(store Store, cm ChainManager, index types.ChainIndex) error { crus, caus, err := cm.UpdatesSince(index, 1000) if err != nil { return fmt.Errorf("failed to subscribe to chain manager: %w", err) + } else if err := store.UpdateChainState(crus, caus); err != nil { + return fmt.Errorf("failed to update chain state: %w", err) } - for _, cru := range crus { - if err := store.ProcessChainRevertUpdate(cru); err != nil { - return fmt.Errorf("failed to process revert update: %w", err) - } - index = cru.State.Index - } - for _, cau := range caus { - if err := store.ProcessChainApplyUpdate(cau); err != nil { - return fmt.Errorf("failed to process apply update: %w", err) - } - index = cau.State.Index - } + index = caus[len(caus)-1].State.Index } return nil } diff --git a/wallet/update.go b/wallet/update.go index a726a2a..52e3f43 100644 --- a/wallet/update.go +++ b/wallet/update.go @@ -29,153 +29,141 @@ type ( RemoveSiafundElements([]types.SiafundElement, types.ChainIndex) error AddressRelevant(types.Address) (bool, error) - } - - // An ApplyTx atomically applies a set of updates to a store. - ApplyTx interface { - UpdateTx ApplyMatureSiacoinBalance(types.ChainIndex) error AddEvents([]Event) error - } - - // RevertTx atomically reverts an update from a store. - RevertTx interface { - UpdateTx RevertMatureSiacoinBalance(types.ChainIndex) error RevertEvents(index types.ChainIndex) error } ) -// ApplyChainUpdates atomically applies a set of chain updates to a store -func ApplyChainUpdates(tx ApplyTx, updates []chain.ApplyUpdate) error { - for _, cau := range updates { - // update the immature balance of each relevant address - if err := tx.ApplyMatureSiacoinBalance(cau.State.Index); err != nil { - return fmt.Errorf("failed to get matured siacoin elements: %w", err) - } +// applyChainUpdate atomically applies a chain update to a store +func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { + // update the immature balance of each relevant address + if err := tx.ApplyMatureSiacoinBalance(cau.State.Index); err != nil { + return fmt.Errorf("failed to get matured siacoin elements: %w", err) + } - // determine which siacoin and siafund elements are ephemeral - // - // note: I thought we could use LeafIndex == EphemeralLeafIndex, but - // it seems to be set before the subscriber is called. - created := make(map[types.Hash256]bool) - ephemeral := make(map[types.Hash256]bool) - for _, txn := range cau.Block.Transactions { - for i := range txn.SiacoinOutputs { - created[types.Hash256(txn.SiacoinOutputID(i))] = true - } - for _, input := range txn.SiacoinInputs { - ephemeral[types.Hash256(input.ParentID)] = created[types.Hash256(input.ParentID)] - } - for i := range txn.SiafundOutputs { - created[types.Hash256(txn.SiafundOutputID(i))] = true - } - for _, input := range txn.SiafundInputs { - ephemeral[types.Hash256(input.ParentID)] = created[types.Hash256(input.ParentID)] - } + // determine which siacoin and siafund elements are ephemeral + // + // note: I thought we could use LeafIndex == EphemeralLeafIndex, but + // it seems to be set before the subscriber is called. + created := make(map[types.Hash256]bool) + ephemeral := make(map[types.Hash256]bool) + for _, txn := range cau.Block.Transactions { + for i := range txn.SiacoinOutputs { + created[types.Hash256(txn.SiacoinOutputID(i))] = true } - - // add new siacoin elements to the store - var newSiacoinElements, spentSiacoinElements []types.SiacoinElement - cau.ForEachSiacoinElement(func(se types.SiacoinElement, spent bool) { - if ephemeral[se.ID] { - return - } - - relevant, err := tx.AddressRelevant(se.SiacoinOutput.Address) - if err != nil { - panic(err) - } else if !relevant { - return - } - - if spent { - spentSiacoinElements = append(spentSiacoinElements, se) - } else { - newSiacoinElements = append(newSiacoinElements, se) - } - }) - - if err := tx.AddSiacoinElements(newSiacoinElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to add siacoin elements: %w", err) - } else if err := tx.RemoveSiacoinElements(spentSiacoinElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to remove siacoin elements: %w", err) + for _, input := range txn.SiacoinInputs { + ephemeral[types.Hash256(input.ParentID)] = created[types.Hash256(input.ParentID)] } - - var newSiafundElements, spentSiafundElements []types.SiafundElement - cau.ForEachSiafundElement(func(se types.SiafundElement, spent bool) { - if ephemeral[se.ID] { - return - } - - relevant, err := tx.AddressRelevant(se.SiafundOutput.Address) - if err != nil { - panic(err) - } else if !relevant { - return - } - - if spent { - spentSiafundElements = append(spentSiafundElements, se) - } else { - newSiafundElements = append(newSiafundElements, se) - } - }) - - if err := tx.AddSiafundElements(newSiafundElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to add siafund elements: %w", err) - } else if err := tx.RemoveSiafundElements(spentSiafundElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to remove siafund elements: %w", err) + for i := range txn.SiafundOutputs { + created[types.Hash256(txn.SiafundOutputID(i))] = true } - - // add events - relevant := func(addr types.Address) bool { - relevant, err := tx.AddressRelevant(addr) - if err != nil { - panic(fmt.Errorf("failed to check if address is relevant: %w", err)) - } - return relevant + for _, input := range txn.SiafundInputs { + ephemeral[types.Hash256(input.ParentID)] = created[types.Hash256(input.ParentID)] } - if err := tx.AddEvents(AppliedEvents(cau.State, cau.Block, cau, relevant)); err != nil { - return fmt.Errorf("failed to add events: %w", err) + } + + // add new siacoin elements to the store + var newSiacoinElements, spentSiacoinElements []types.SiacoinElement + cau.ForEachSiacoinElement(func(se types.SiacoinElement, spent bool) { + if ephemeral[se.ID] { + return } - // fetch all siacoin and siafund state elements - siacoinStateElements, err := tx.SiacoinStateElements() + relevant, err := tx.AddressRelevant(se.SiacoinOutput.Address) if err != nil { - return fmt.Errorf("failed to get siacoin state elements: %w", err) + panic(err) + } else if !relevant { + return } - // update siacoin element proofs - for i := range siacoinStateElements { - cau.UpdateElementProof(&siacoinStateElements[i]) + if spent { + spentSiacoinElements = append(spentSiacoinElements, se) + } else { + newSiacoinElements = append(newSiacoinElements, se) } + }) + + if err := tx.AddSiacoinElements(newSiacoinElements, cau.State.Index); err != nil { + return fmt.Errorf("failed to add siacoin elements: %w", err) + } else if err := tx.RemoveSiacoinElements(spentSiacoinElements, cau.State.Index); err != nil { + return fmt.Errorf("failed to remove siacoin elements: %w", err) + } - if err := tx.UpdateSiacoinStateElements(siacoinStateElements); err != nil { - return fmt.Errorf("failed to update siacoin state elements: %w", err) + var newSiafundElements, spentSiafundElements []types.SiafundElement + cau.ForEachSiafundElement(func(se types.SiafundElement, spent bool) { + if ephemeral[se.ID] { + return } - siafundStateElements, err := tx.SiafundStateElements() + relevant, err := tx.AddressRelevant(se.SiafundOutput.Address) if err != nil { - return fmt.Errorf("failed to get siafund state elements: %w", err) + panic(err) + } else if !relevant { + return } - // update siafund element proofs - for i := range siafundStateElements { - cau.UpdateElementProof(&siafundStateElements[i]) + if spent { + spentSiafundElements = append(spentSiafundElements, se) + } else { + newSiafundElements = append(newSiafundElements, se) } + }) - if err := tx.UpdateSiafundStateElements(siafundStateElements); err != nil { - return fmt.Errorf("failed to update siacoin state elements: %w", err) + if err := tx.AddSiafundElements(newSiafundElements, cau.State.Index); err != nil { + return fmt.Errorf("failed to add siafund elements: %w", err) + } else if err := tx.RemoveSiafundElements(spentSiafundElements, cau.State.Index); err != nil { + return fmt.Errorf("failed to remove siafund elements: %w", err) + } + + // add events + relevant := func(addr types.Address) bool { + relevant, err := tx.AddressRelevant(addr) + if err != nil { + panic(fmt.Errorf("failed to check if address is relevant: %w", err)) } + return relevant + } + if err := tx.AddEvents(AppliedEvents(cau.State, cau.Block, cau, relevant)); err != nil { + return fmt.Errorf("failed to add events: %w", err) + } + + // fetch all siacoin and siafund state elements + siacoinStateElements, err := tx.SiacoinStateElements() + if err != nil { + return fmt.Errorf("failed to get siacoin state elements: %w", err) + } + + // update siacoin element proofs + for i := range siacoinStateElements { + cau.UpdateElementProof(&siacoinStateElements[i]) + } + + if err := tx.UpdateSiacoinStateElements(siacoinStateElements); err != nil { + return fmt.Errorf("failed to update siacoin state elements: %w", err) + } + + siafundStateElements, err := tx.SiafundStateElements() + if err != nil { + return fmt.Errorf("failed to get siafund state elements: %w", err) + } + + // update siafund element proofs + for i := range siafundStateElements { + cau.UpdateElementProof(&siafundStateElements[i]) + } + + if err := tx.UpdateSiafundStateElements(siafundStateElements); err != nil { + return fmt.Errorf("failed to update siacoin state elements: %w", err) } return nil } -// RevertChainUpdate atomically reverts a chain update from a store -func RevertChainUpdate(tx RevertTx, cru chain.RevertUpdate) error { +// revertChainUpdate atomically reverts a chain update from a store +func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types.ChainIndex) error { // determine which siacoin and siafund elements are ephemeral // // note: I thought we could use LeafIndex == EphemeralLeafIndex, but @@ -197,12 +185,6 @@ func RevertChainUpdate(tx RevertTx, cru chain.RevertUpdate) error { } } - // revert the immature balance of each relevant address - revertedIndex := types.ChainIndex{ - Height: cru.State.Index.Height + 1, - ID: cru.Block.ID(), - } - var removedSiacoinElements, addedSiacoinElements []types.SiacoinElement cru.ForEachSiacoinElement(func(se types.SiacoinElement, spent bool) { if ephemeral[se.ID] { @@ -291,3 +273,22 @@ func RevertChainUpdate(tx RevertTx, cru chain.RevertUpdate) error { // revert events return tx.RevertEvents(revertedIndex) } + +func UpdateChainState(tx UpdateTx, reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { + for _, cru := range reverted { + revertedIndex := types.ChainIndex{ + ID: cru.Block.ID(), + Height: cru.State.Index.Height + 1, + } + if err := revertChainUpdate(tx, cru, revertedIndex); err != nil { + return fmt.Errorf("failed to revert chain update %q: %w", revertedIndex, err) + } + } + + for _, cau := range applied { + if err := applyChainUpdate(tx, cau); err != nil { + return fmt.Errorf("failed to apply chain update %q: %w", cau.State.Index, err) + } + } + return nil +}