Skip to content

Commit

Permalink
sqlite,wallet: batch updates into a single transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 27, 2024
1 parent 70fed3e commit b7e808d
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 187 deletions.
42 changes: 4 additions & 38 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/walletd/wallet"
"go.uber.org/zap"
)

type updateTx struct {
Expand Down Expand Up @@ -573,49 +572,16 @@ func (ut *updateTx) RevertEvents(index types.ChainIndex) error {
}

// ProcessChainApplyUpdate implements chain.Subscriber
func (s *Store) ProcessChainApplyUpdate(cau chain.ApplyUpdate) error {
s.updates = append(s.updates, cau)
log := s.log.Named("ProcessChainApplyUpdate").With(zap.Stringer("index", cau.State.Index))
log.Debug("received update")
log.Debug("committing updates", zap.Int("n", len(s.updates)))
func (s *Store) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error {
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.ApplyChainUpdates(utx, s.updates); err != nil {
return fmt.Errorf("failed to apply updates: %w", err)
} else if err := setLastCommittedIndex(tx, cau.State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
}
s.updates = nil
return nil
})
}

// ProcessChainRevertUpdate implements chain.Subscriber
func (s *Store) ProcessChainRevertUpdate(cru chain.RevertUpdate) error {
log := s.log.Named("ProcessChainRevertUpdate").With(zap.Stringer("index", cru.State.Index))

// update hasn't been committed yet
if len(s.updates) > 0 && s.updates[len(s.updates)-1].Block.ID() == cru.Block.ID() {
log.Debug("removed uncommitted update")
s.updates = s.updates[:len(s.updates)-1]
return nil
}

log.Debug("reverting update")
// update has been committed, revert it
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.RevertChainUpdate(utx, cru); err != nil {
return fmt.Errorf("failed to revert update: %w", err)
} else if err := setLastCommittedIndex(tx, cru.State.Index); err != nil {
if err := wallet.UpdateChainState(utx, reverted, applied); err != nil {
return fmt.Errorf("failed to update chain state: %w", err)
} else if err := setLastCommittedIndex(tx, applied[len(applied)-1].State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
}
return nil
Expand Down
15 changes: 3 additions & 12 deletions persist/sqlite/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,10 @@ func syncDB(t *testing.T, db *sqlite.Store, cm *chain.Manager) {
crus, caus, err := cm.UpdatesSince(index, 1000)
if err != nil {
t.Fatal(err)
} else if err := db.UpdateChainState(crus, caus); err != nil {
t.Fatal(err)
}
for _, cru := range crus {
if err := db.ProcessChainRevertUpdate(cru); err != nil {
t.Fatal("failed to process revert update:", err)
}
index = cru.State.Index
}
for _, cau := range caus {
if err := db.ProcessChainApplyUpdate(cau); err != nil {
t.Fatal("failed to process apply update:", err)
}
index = cau.State.Index
}
index = caus[len(caus)-1].State.Index
}
}

Expand Down
3 changes: 0 additions & 3 deletions persist/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"time"

"go.sia.tech/coreutils/chain"
"go.uber.org/zap"
"lukechampine.com/frand"
)
Expand All @@ -19,8 +18,6 @@ type (
Store struct {
db *sql.DB
log *zap.Logger

updates []chain.ApplyUpdate
}
)

Expand Down
18 changes: 4 additions & 14 deletions wallet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ type (

// A Store is a persistent store of wallet data.
Store interface {
ProcessChainApplyUpdate(cau chain.ApplyUpdate) error
ProcessChainRevertUpdate(cru chain.RevertUpdate) error
UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error

WalletEvents(walletID ID, offset, limit int) ([]Event, error)
AddWallet(Wallet) (Wallet, error)
Expand Down Expand Up @@ -172,19 +171,10 @@ func syncStore(store Store, cm ChainManager, index types.ChainIndex) error {
crus, caus, err := cm.UpdatesSince(index, 1000)
if err != nil {
return fmt.Errorf("failed to subscribe to chain manager: %w", err)
} else if err := store.UpdateChainState(crus, caus); err != nil {
return fmt.Errorf("failed to update chain state: %w", err)
}
for _, cru := range crus {
if err := store.ProcessChainRevertUpdate(cru); err != nil {
return fmt.Errorf("failed to process revert update: %w", err)
}
index = cru.State.Index
}
for _, cau := range caus {
if err := store.ProcessChainApplyUpdate(cau); err != nil {
return fmt.Errorf("failed to process apply update: %w", err)
}
index = cau.State.Index
}
index = caus[len(caus)-1].State.Index
}
return nil
}
Expand Down
Loading

0 comments on commit b7e808d

Please sign in to comment.