Skip to content

Commit

Permalink
[wip] gets destination backup working for KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanlott committed Jul 26, 2023
1 parent f6ef7e9 commit 4765b3a
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 20 deletions.
4 changes: 4 additions & 0 deletions persistence/blockstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (bs *blockStore) Stop() error {
return bs.kv.Stop()
}

func (bs *blockStore) Backup(path string) error {
return bs.kv.Backup(path)
}

func (bs *blockStore) Delete(key []byte) error { return bs.kv.Delete(key) }
func (bs *blockStore) Exists(key []byte) (bool, error) { return bs.kv.Exists(key) }
func (bs *blockStore) GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error) {
Expand Down
30 changes: 30 additions & 0 deletions persistence/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ package kvstore

import (
"errors"
"fmt"
"log"
"os"
"path/filepath"

badger "github.com/dgraph-io/badger/v3"
"github.com/pokt-network/smt"
Expand All @@ -22,6 +25,8 @@ type KVStore interface {
GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error)
Exists(key []byte) (bool, error)
ClearAll() error

Backup(filepath string) error
}

const (
Expand Down Expand Up @@ -141,6 +146,31 @@ func (store *badgerKVStore) Stop() error {
return store.db.Close()
}

// Backup creates a backup for the badgerDB at the provided path.
// It creates a file
func (store *badgerKVStore) Backup(backupPath string) error {
// create backup directory if it doesn't exist
if err := os.MkdirAll(filepath.Dir(backupPath), os.ModePerm); err != nil {
return fmt.Errorf("failed to create backup directory: %v", err)
}

// create the backup file itself
backupFile, err := os.Create(backupPath)
if err != nil {
return fmt.Errorf("failed to create backup file: %v", err)
}
defer backupFile.Close()

// dump the database to the backup file
// CONSIDER: ??? err := store.db.NewStream().Backup(backupFile, 0) // What's the difference here?
_, err = store.db.Backup(backupFile, 0)
if err != nil {
return err
}

return nil
}

// PrefixEndBytes returns the end byteslice for a noninclusive range
// that would include all byte slices for which the input is the prefix
func prefixEndBytes(prefix []byte) []byte {
Expand Down
59 changes: 59 additions & 0 deletions persistence/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package kvstore

import (
"encoding/hex"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -330,10 +334,65 @@ func TestKVStore_ClearAll(t *testing.T) {
require.NoError(t, err)
}

func TestKVStore_Backup(t *testing.T) {
t.Run("should backup an in-memory database", func(t *testing.T) {
store := NewMemKVStore()
require.NotNil(t, store)

tmpdir := t.TempDir()
path := filepath.Join(tmpdir, "TestKVStore_Backup_InMemory.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, tmpdir)
require.NoError(t, err)
require.False(t, empty)

// assert on individual files
files, err := ioutil.ReadDir(tmpdir)
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
t.Run("should backup an on-disk store database", func(t *testing.T) {
tmpdir := t.TempDir()
kvpath := filepath.Join(tmpdir, "TestKVStore_Backup_OnDisk_Source.bak")
store, err := NewKVStore(kvpath)
require.NoError(t, err)
require.NotNil(t, store)

backupDir := t.TempDir()
path := filepath.Join(backupDir, "TestKVStore_Backup_OnDisk_Destination.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, backupDir)
require.NoError(t, err)
require.False(t, empty)

// assert on individual files
files, err := ioutil.ReadDir(backupDir)
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
}

func setupStore(t *testing.T, store KVStore) {
t.Helper()
err := store.Set([]byte("foo"), []byte("bar"))
require.NoError(t, err)
err = store.Set([]byte("baz"), []byte("bin"))
require.NoError(t, err)
}

func isEmpty(t *testing.T, dir string) (bool, error) {
t.Helper()
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1)
if err == io.EOF {
return true, nil
}
return false, err
}
2 changes: 1 addition & 1 deletion persistence/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (*persistenceModule) Create(bus modules.Bus, options ...modules.ModuleOptio
trees.WithTreeStoreDirectory(persistenceCfg.TreesStoreDir),
trees.WithLogger(m.logger))
if err != nil {
return nil, fmt.Errorf("failed to create TreeStoreModule: %w", err)
return nil, fmt.Errorf("failed to create %s: %w", modules.TreeStoreSubmoduleName, err)
}

m.config = persistenceCfg
Expand Down
69 changes: 67 additions & 2 deletions persistence/trees/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package trees
import (
"encoding/hex"
"fmt"
"io"
"os"
"testing"

"github.com/pokt-network/pocket/logger"
Expand Down Expand Up @@ -148,8 +150,6 @@ func TestTreeStore_SaveAndLoad(t *testing.T) {
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
}
// TODO IN THIS COMMIT do we need to start this treestore?
// require.NoError(t, ts2.Start())

// Load sets a tree store to the provided worldstate
err = ts2.Load(w)
Expand All @@ -160,3 +160,68 @@ func TestTreeStore_SaveAndLoad(t *testing.T) {
// Assert that hash is unchanged from save and load
require.Equal(t, hash1, hash2)
}

func TestTreeStore_SaveBackup(t *testing.T) {
ts := newTestTreeStore(t)
tmpdir := t.TempDir()
// assert that the directory is empty before backup
ok, err := isEmpty(tmpdir)
require.NoError(t, err)
require.True(t, ok)

// Trigger a backup
require.NoError(t, ts.Backup(tmpdir))

// assert that the directory is not empty after Backup has returned
ok, err = isEmpty(tmpdir)
require.NoError(t, err)
require.False(t, ok)
}

// creates a new tree store with a tmp directory for nodestore persistence
// and then starts the tree store and returns its pointer.
func newTestTreeStore(t *testing.T) *treeStore {
ctrl := gomock.NewController(t)
tmpDir := t.TempDir()

mockTxIndexer := mock_types.NewMockTxIndexer(ctrl)
mockBus := mockModules.NewMockBus(ctrl)
mockPersistenceMod := mockModules.NewMockPersistenceModule(ctrl)

mockBus.EXPECT().GetPersistenceModule().AnyTimes().Return(mockPersistenceMod)
mockPersistenceMod.EXPECT().GetTxIndexer().AnyTimes().Return(mockTxIndexer)

ts := &treeStore{
logger: &zerolog.Logger{},
treeStoreDir: tmpDir,
}
require.NoError(t, ts.Start())
require.NotNil(t, ts.rootTree.tree)

for _, treeName := range stateTreeNames {
err := ts.merkleTrees[treeName].tree.Update([]byte("foo"), []byte("bar"))
require.NoError(t, err)
}

err := ts.Commit()
require.NoError(t, err)

hash1 := ts.getStateHash()
require.NotEmpty(t, hash1)

return ts
}

func isEmpty(dir string) (bool, error) {
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1) // Or f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err // Either not empty or error, suits both cases
}
7 changes: 7 additions & 0 deletions persistence/trees/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (

var _ modules.TreeStoreModule = &treeStore{}

// Create returns a TreeStoreSubmodule that has been setup with the provided TreeStoreOptions, started,
// and then registered to the bus.
func (*treeStore) Create(bus modules.Bus, options ...modules.TreeStoreOption) (modules.TreeStoreModule, error) {
m := &treeStore{}

for _, option := range options {
option(m)
}

if err := m.Start(); err != nil {
return nil, fmt.Errorf("failed to start %s: %w", modules.TreeStoreSubmoduleName, err)
}

bus.RegisterModule(m)

return m, nil
Expand Down Expand Up @@ -71,6 +77,7 @@ func (t *treeStore) Stop() error {

func (t *treeStore) GetModuleName() string { return modules.TreeStoreSubmoduleName }

// setupTrees is called by Start and it loads the treestore at the given directory
func (t *treeStore) setupTrees() error {
if t.treeStoreDir == ":memory:" {
return t.setupInMemory()
Expand Down
17 changes: 7 additions & 10 deletions persistence/trees/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ func TestTreeStore_Create(t *testing.T) {
treemod, err := trees.Create(mockBus, trees.WithTreeStoreDirectory(":memory:"))
assert.NoError(t, err)

require.NoError(t, treemod.Start())
// Create should setup a value for each tree
for _, v := range stateTreeNames {
root, ns := treemod.GetTree(v)
require.NotEmpty(t, root)
require.NotEmpty(t, ns)
}

got := treemod.GetBus()
assert.Equal(t, got, mockBus)
Expand All @@ -68,21 +73,13 @@ func TestTreeStore_StartAndStop(t *testing.T) {
mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl)
mockBus := createMockBus(t, mockRuntimeMgr)

// Create returns a started TreeStoreSubmodule
treemod, err := trees.Create(
mockBus,
trees.WithTreeStoreDirectory(":memory:"),
trees.WithLogger(&zerolog.Logger{}))
assert.NoError(t, err)

// GetTree should return nil for each tree if Start has not been called
for _, v := range stateTreeNames {
root, ns := treemod.GetTree(v)
require.Empty(t, root)
require.Empty(t, ns)
}
// Should start without error
require.NoError(t, treemod.Start())

// Should stop without error
require.NoError(t, treemod.Stop())

Expand Down
39 changes: 36 additions & 3 deletions persistence/trees/trees.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash"
"log"
"path/filepath"

"github.com/jackc/pgx/v5"
"github.com/pokt-network/pocket/persistence/indexer"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"github.com/pokt-network/smt"
"go.uber.org/multierr"
)

// smtTreeHasher sets the hasher used by the tree SMT trees
Expand Down Expand Up @@ -320,7 +322,7 @@ func (t *treeStore) Rollback() error {
}

// Load sets the TreeStore merkle and root trees to the values provided in the worldstate
func (t *treeStore) Load(w *Worldstate) error {
func (t *treeStore) Load(w *worldstate) error {
t.merkleTrees = make(map[string]*stateTree)

// import root tree
Expand Down Expand Up @@ -363,16 +365,47 @@ func (t *treeStore) save() (*worldstate, error) {
w := &worldstate{
TreeStoreDir: t.treeStoreDir,
MerkleRoots: make(map[string][]byte),
MerkleTrees: make(map[string]*stateTree),
RootHash: t.rootTree.tree.Root(),
RootTree: t.rootTree,
}

for treeName, tree := range t.merkleTrees {
w.MerkleRoots[treeName] = tree.tree.Root()
for treeName := range t.merkleTrees {
root, nodeStore := t.GetTree(treeName)
tree := smt.ImportSparseMerkleTree(nodeStore, smtTreeHasher, root)
w.MerkleTrees[treeName] = &stateTree{
name: treeName,
tree: tree,
nodeStore: nodeStore,
}
w.MerkleRoots[treeName] = tree.Root()
}

root, nodeStore := t.GetTree(RootTreeName)
tree := smt.ImportSparseMerkleTree(nodeStore, smtTreeHasher, root)
w.RootTree = &stateTree{
name: RootTreeName,
tree: tree,
nodeStore: nodeStore,
}

return w, nil
}

// Backup creates a new backup of each tree in the tree store to the provided directory.
// Each tree is backed up in an eponymous file in the provided backupDir.
func (t *treeStore) Backup(backupDir string) error {
errs := []error{}
for _, st := range t.merkleTrees {
treePath := filepath.Join(backupDir, st.name)
if err := st.nodeStore.Backup(treePath); err != nil {
t.logger.Err(err).Msgf("failed to backup %s tree: %+v", st.name, err)
errs = append(errs, err)
}
}
return multierr.Combine(errs...)
}

////////////////////////
// Actor Tree Helpers //
////////////////////////
Expand Down
4 changes: 0 additions & 4 deletions persistence/trees/trees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func TestTreeStore_Update(t *testing.T) {
pmod := newTestPersistenceModule(t, dbUrl)
context := newTestPostgresContext(t, 0, pmod)

require.NoError(t, context.SetSavePoint())

hash1, err := context.ComputeStateHash()
require.NoError(t, err)
require.NotEmpty(t, hash1)
Expand All @@ -59,8 +57,6 @@ func TestTreeStore_Update(t *testing.T) {
_, err = createAndInsertDefaultTestApp(t, context)
require.NoError(t, err)

require.NoError(t, context.SetSavePoint())

hash2, err := context.ComputeStateHash()
require.NoError(t, err)
require.NotEmpty(t, hash2)
Expand Down

0 comments on commit 4765b3a

Please sign in to comment.