Skip to content

Commit

Permalink
[wip] gets destination backup working for KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanlott committed Aug 1, 2023
1 parent aba7feb commit 2c6fb5c
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 130 deletions.
4 changes: 4 additions & 0 deletions persistence/blockstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (bs *blockStore) Stop() error {
return bs.kv.Stop()
}

func (bs *blockStore) Backup(path string) error {
return bs.kv.Backup(path)
}

func (bs *blockStore) Delete(key []byte) error { return bs.kv.Delete(key) }
func (bs *blockStore) Exists(key []byte) (bool, error) { return bs.kv.Exists(key) }
func (bs *blockStore) GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error) {
Expand Down
29 changes: 29 additions & 0 deletions persistence/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ package kvstore

import (
"errors"
"fmt"
"log"
"os"
"path/filepath"

badger "github.com/dgraph-io/badger/v3"
"github.com/pokt-network/smt"
Expand All @@ -22,6 +25,8 @@ type KVStore interface {
GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error)
Exists(key []byte) (bool, error)
ClearAll() error

Backup(filepath string) error
}

const (
Expand Down Expand Up @@ -141,6 +146,30 @@ func (store *badgerKVStore) Stop() error {
return store.db.Close()
}

// Backup creates a backup for the badgerDB at the provided path.
// It creates a file
func (store *badgerKVStore) Backup(backupPath string) error {
// create backup directory if it doesn't exist
if err := os.MkdirAll(filepath.Dir(backupPath), os.ModePerm); err != nil {
return fmt.Errorf("failed to create backup directory: %v", err)
}

// create the backup file itself
backupFile, err := os.Create(backupPath)
if err != nil {
return fmt.Errorf("failed to create backup file: %v", err)
}
defer backupFile.Close()

// dump the database to the backup file
_, err = store.db.Backup(backupFile, 0)
if err != nil {
return err
}

return nil
}

// PrefixEndBytes returns the end byteslice for a noninclusive range
// that would include all byte slices for which the input is the prefix
func prefixEndBytes(prefix []byte) []byte {
Expand Down
67 changes: 67 additions & 0 deletions persistence/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package kvstore

import (
"encoding/hex"
"io"
"os"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -330,10 +333,74 @@ func TestKVStore_ClearAll(t *testing.T) {
require.NoError(t, err)
}

func TestKVStore_Backup(t *testing.T) {
t.Run("should backup an in-memory database", func(t *testing.T) {
store := NewMemKVStore()
require.NotNil(t, store)

tmpdir := t.TempDir()
path := filepath.Join(tmpdir, "TestKVStore_Backup_InMemory.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, tmpdir)
require.NoError(t, err)
require.False(t, empty)

// open the directory and assert on individual files
dir, err := os.Open(tmpdir)
require.NoError(t, err)
defer dir.Close()

files, err := dir.Readdir(0) // 0 means read all directory entries
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
t.Run("should backup an on-disk store database", func(t *testing.T) {
tmpdir := t.TempDir()
kvpath := filepath.Join(tmpdir, "TestKVStore_Backup_OnDisk_Source.bak")
store, err := NewKVStore(kvpath)
require.NoError(t, err)
require.NotNil(t, store)

backupDir := t.TempDir()
path := filepath.Join(backupDir, "TestKVStore_Backup_OnDisk_Destination.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, backupDir)
require.NoError(t, err)
require.False(t, empty)

// open the directory and assert on individual files
dir, err := os.Open(backupDir)
require.NoError(t, err)
defer dir.Close()

files, err := dir.Readdir(0) // 0 means read all directory entries
require.NoError(t, err)
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
}

func setupStore(t *testing.T, store KVStore) {
t.Helper()
err := store.Set([]byte("foo"), []byte("bar"))
require.NoError(t, err)
err = store.Set([]byte("baz"), []byte("bin"))
require.NoError(t, err)
}

func isEmpty(t *testing.T, dir string) (bool, error) {
t.Helper()
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1)
if err == io.EOF {
return true, nil
}
return false, err
}
2 changes: 1 addition & 1 deletion persistence/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (*persistenceModule) Create(bus modules.Bus, options ...modules.ModuleOptio
trees.WithTreeStoreDirectory(persistenceCfg.TreesStoreDir),
trees.WithLogger(m.logger))
if err != nil {
return nil, fmt.Errorf("failed to create TreeStoreModule: %w", err)
return nil, fmt.Errorf("failed to create %s: %w", modules.TreeStoreSubmoduleName, err)
}

m.config = persistenceCfg
Expand Down
127 changes: 97 additions & 30 deletions persistence/trees/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package trees
import (
"encoding/hex"
"fmt"
"io"
"os"
"testing"

"github.com/pokt-network/pocket/logger"
mock_types "github.com/pokt-network/pocket/persistence/types/mocks"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
mock_modules "github.com/pokt-network/pocket/shared/modules/mocks"

"github.com/golang/mock/gomock"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,8 +27,8 @@ func TestTreeStore_AtomicUpdatesWithSuccessfulRollback(t *testing.T) {
ctrl := gomock.NewController(t)

mockTxIndexer := mock_types.NewMockTxIndexer(ctrl)
mockBus := mockModules.NewMockBus(ctrl)
mockPersistenceMod := mockModules.NewMockPersistenceModule(ctrl)
mockBus := mock_modules.NewMockBus(ctrl)
mockPersistenceMod := mock_modules.NewMockPersistenceModule(ctrl)

mockBus.EXPECT().GetPersistenceModule().AnyTimes().Return(mockPersistenceMod)
mockPersistenceMod.EXPECT().GetTxIndexer().AnyTimes().Return(mockTxIndexer)
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestTreeStore_AtomicUpdatesWithSuccessfulRollback(t *testing.T) {
hash3 := ts.getStateHash()
require.Equal(t, hash3, hash2)
require.Equal(t, hash3, h1)
ts.Rollback()
require.NoError(t, ts.Rollback())

// confirm it's not in the tree
v, err := ts.merkleTrees[TransactionsTreeName].tree.Get([]byte("fiz"))
Expand All @@ -104,18 +105,94 @@ func TestTreeStore_AtomicUpdatesWithSuccessfulRollback(t *testing.T) {
}

func TestTreeStore_SaveAndLoad(t *testing.T) {
t.Parallel()
t.Run("should save a backup in a directory", func(t *testing.T) {
ts := newTestTreeStore(t)
tmpdir := t.TempDir()
// assert that the directory is empty before backup
ok, err := isEmpty(tmpdir)
require.NoError(t, err)
require.True(t, ok)

// Trigger a backup
require.NoError(t, ts.Backup(tmpdir))

// assert that the directory is not empty after Backup has returned
ok, err = isEmpty(tmpdir)
require.NoError(t, err)
require.False(t, ok)
})
t.Run("should load a backup and maintain TreeStore hash integrity", func(t *testing.T) {
ctrl := gomock.NewController(t)
tmpDir := t.TempDir()

mockTxIndexer := mock_types.NewMockTxIndexer(ctrl)
mockBus := mock_modules.NewMockBus(ctrl)
mockPersistenceMod := mock_modules.NewMockPersistenceModule(ctrl)

mockBus.EXPECT().GetPersistenceModule().AnyTimes().Return(mockPersistenceMod)
mockPersistenceMod.EXPECT().GetTxIndexer().AnyTimes().Return(mockTxIndexer)

ts := &treeStore{
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
}
require.NoError(t, ts.Start())
require.NotNil(t, ts.rootTree.tree)

for _, treeName := range stateTreeNames {
err := ts.merkleTrees[treeName].tree.Update([]byte("foo"), []byte("bar"))
require.NoError(t, err)
}

err := ts.Commit()
require.NoError(t, err)

hash1 := ts.getStateHash()
require.NotEmpty(t, hash1)

w, err := ts.save()
require.NoError(t, err)
require.NotNil(t, w)
require.NotNil(t, w.rootHash)
require.NotNil(t, w.merkleRoots)

// Stop the first tree store so that it's databases are no longer used
require.NoError(t, ts.Stop())

// declare a second TreeStore with no trees then load the first worldstate into it
ts2 := &treeStore{
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
}

// Load sets a tree store to the provided worldstate
err = ts2.Load(w)
require.NoError(t, err)

hash2 := ts2.getStateHash()

// Assert that hash is unchanged from save and load
require.Equal(t, hash1, hash2)
})
}

// creates a new tree store with a tmp directory for nodestore persistence
// and then starts the tree store and returns its pointer.
func newTestTreeStore(t *testing.T) *treeStore {
t.Helper()
ctrl := gomock.NewController(t)
tmpDir := t.TempDir()

mockTxIndexer := mock_types.NewMockTxIndexer(ctrl)
mockBus := mockModules.NewMockBus(ctrl)
mockPersistenceMod := mockModules.NewMockPersistenceModule(ctrl)
mockBus := mock_modules.NewMockBus(ctrl)
mockPersistenceMod := mock_modules.NewMockPersistenceModule(ctrl)

mockBus.EXPECT().GetPersistenceModule().AnyTimes().Return(mockPersistenceMod)
mockPersistenceMod.EXPECT().GetTxIndexer().AnyTimes().Return(mockTxIndexer)

ts := &treeStore{
logger: &zerolog.Logger{},
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
}
require.NoError(t, ts.Start())
Expand All @@ -132,29 +209,19 @@ func TestTreeStore_SaveAndLoad(t *testing.T) {
hash1 := ts.getStateHash()
require.NotEmpty(t, hash1)

w, err := ts.save()
require.NoError(t, err)
require.NotNil(t, w)
require.NotNil(t, w.rootHash)
require.NotNil(t, w.merkleRoots)

// Stop the first tree store so that it's databases are no longer used
require.NoError(t, ts.Stop())
return ts
}

// declare a second TreeStore with no trees then load the first worldstate into it
ts2 := &treeStore{
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
func isEmpty(dir string) (bool, error) {
f, err := os.Open(dir)
if err != nil {
return false, err
}
// TODO IN THIS COMMIT do we need to start this treestore?
// require.NoError(t, ts2.Start())

// Load sets a tree store to the provided worldstate
err = ts2.Load(w)
require.NoError(t, err)

hash2 := ts2.getStateHash()
defer f.Close()

// Assert that hash is unchanged from save and load
require.Equal(t, hash1, hash2)
_, err = f.Readdirnames(1) // Or f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err // Either not empty or error, suits both cases
}
11 changes: 9 additions & 2 deletions persistence/trees/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@ package trees

import (
"encoding/hex"
"errors"
"fmt"

"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/smt"
"go.uber.org/multierr"
)

var _ modules.TreeStoreModule = &treeStore{}

// Create returns a TreeStoreSubmodule that has been setup with the provided TreeStoreOptions, started,
// and then registered to the bus.
func (*treeStore) Create(bus modules.Bus, options ...modules.TreeStoreOption) (modules.TreeStoreModule, error) {
m := &treeStore{}

for _, option := range options {
option(m)
}

if err := m.Start(); err != nil {
return nil, fmt.Errorf("failed to start %s: %w", modules.TreeStoreSubmoduleName, err)
}

bus.RegisterModule(m)

return m, nil
Expand Down Expand Up @@ -66,11 +72,12 @@ func (t *treeStore) Stop() error {
errs = append(errs, err)
}
}
return multierr.Combine(errs...)
return errors.Join(errs...)
}

func (t *treeStore) GetModuleName() string { return modules.TreeStoreSubmoduleName }

// setupTrees is called by Start and it loads the treestore at the given directory
func (t *treeStore) setupTrees() error {
if t.treeStoreDir == ":memory:" {
return t.setupInMemory()
Expand Down
Loading

0 comments on commit 2c6fb5c

Please sign in to comment.