Skip to content

Commit

Permalink
Better Tombstone handling in Iter (#21)
Browse files Browse the repository at this point in the history
Adds an option for iterating with tombstones but by default the Iter
function skips tombstones to match the Get functionality. Also fixes the
colons in namespace issue by removing the namespace prefix rather than
splitting on the separator. Increases tests to ensure that Iter covers
undead objects.

Note, this will require a change to Trtl replication, it must use the
WithTombstone option when it is iterating in initiator phase 1 and
remote phase 2; otherwise deletes will not be replicated.
  • Loading branch information
bbengfort authored Feb 9, 2022
1 parent 59182a3 commit 396d314
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 134 deletions.
67 changes: 17 additions & 50 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package honu_test

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
Expand All @@ -22,26 +21,26 @@ var (

func setupLevelDB(t testing.TB) (*leveldb.DB, string) {
// Create a new leveldb database in a temporary directory
tmpDir, err := ioutil.TempDir("", "honuldb-*")
tmpDir, err := ioutil.TempDir("", "leveldb-*")
require.NoError(t, err)

// Open a leveldb database directly without honu wrapper
db, err := leveldb.OpenFile(tmpDir, nil)
require.NoError(t, err)
if err != nil && tmpDir != "" {
fmt.Println(tmpDir)
os.RemoveAll(tmpDir)
}
require.NoError(t, err)

t.Cleanup(func() {
db.Close()
os.RemoveAll(tmpDir)
})

return db, tmpDir
}

func BenchmarkHonuGet(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
// NOTE: defers are evaluated in FIFO order, so this ensures the db is closed first then the directory deleted
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -63,11 +62,7 @@ func BenchmarkHonuGet(b *testing.B) {
}

func BenchmarkLevelDBGet(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -88,11 +83,7 @@ func BenchmarkLevelDBGet(b *testing.B) {
}

func BenchmarkHonuPut(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -110,11 +101,7 @@ func BenchmarkHonuPut(b *testing.B) {
}

func BenchmarkLevelDBPut(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -132,11 +119,7 @@ func BenchmarkLevelDBPut(b *testing.B) {
}

func BenchmarkHonuDelete(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -158,11 +141,7 @@ func BenchmarkHonuDelete(b *testing.B) {
}

func BenchmarkLevelDBDelete(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -183,11 +162,7 @@ func BenchmarkLevelDBDelete(b *testing.B) {
}

func BenchmarkHonuIter(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
for _, key := range []string{"aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh", "ii", "jj"} {
Expand Down Expand Up @@ -219,11 +194,7 @@ func BenchmarkHonuIter(b *testing.B) {
}

func BenchmarkLevelDBIter(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
for _, key := range []string{"aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh", "ii", "jj"} {
Expand Down Expand Up @@ -253,11 +224,7 @@ func BenchmarkLevelDBIter(b *testing.B) {
}

func BenchmarkHonuObject(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand Down
62 changes: 47 additions & 15 deletions engines/leveldb/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,90 @@ package leveldb

import (
"bytes"
"fmt"

honuiter "github.com/rotationalio/honu/iterator"
pb "github.com/rotationalio/honu/object"
opts "github.com/rotationalio/honu/options"
"github.com/syndtr/goleveldb/leveldb/iterator"
"google.golang.org/protobuf/proto"
)

// NewLevelDBIterator creates a new iterator that wraps a leveldb Iterator with object
// management access and Honu-specific serialization.
func NewLevelDBIterator(iter iterator.Iterator, namespace string) honuiter.Iterator {
return &ldbIterator{ldb: iter, namespace: namespace}
func NewLevelDBIterator(iter iterator.Iterator, options *opts.Options) honuiter.Iterator {
return &ldbIterator{ldb: iter, options: options}
}

// Wraps the underlying leveldb iterator to provide object management access.
type ldbIterator struct {
ldb iterator.Iterator
namespace string
ldb iterator.Iterator
options *opts.Options
}

// Type check for the ldbIterator
var _ honuiter.Iterator = &ldbIterator{}

func (i *ldbIterator) Next() bool { return i.ldb.Next() }
func (i *ldbIterator) Prev() bool { return i.ldb.Prev() }
func (i *ldbIterator) Error() error { return i.ldb.Error() }
func (i *ldbIterator) Release() { i.ldb.Release() }

func (i *ldbIterator) Next() bool {
if ok := i.ldb.Next(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
return i.Next()
}
}
return true
}

func (i *ldbIterator) Prev() bool {
if ok := i.ldb.Prev(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
return i.Prev()
}
}
return true
}

func (i *ldbIterator) Seek(key []byte) bool {
// NOTE: no need to do tombstone checking in Seek because Next will be called.
// We need to prefix the seek with the correct namespace
key = prepend(i.namespace, key)
if i.options.Namespace != "" {
key = prepend(i.options.Namespace, key)
}
return i.ldb.Seek(key)
}

func (i *ldbIterator) Key() []byte {
// Fetch the key then split the namespace from the key
// Note that because the namespace itself might have colons in it, we
// strip off the namespace prefix then remove any preceding colons.
key := i.ldb.Key()
parts := bytes.SplitN(key, nssep, 2)
if len(parts) == 2 {
return parts[1]
if i.options.Namespace != "" {
prefix := prepend(i.options.Namespace, nil)
return bytes.TrimPrefix(key, prefix)
}
return key
}

func (i *ldbIterator) Value() []byte {
obj, err := i.Object()
if err != nil {
fmt.Println(err)
// NOTE: if err is not nil, it's up to the caller to get the error from Object
return nil
} else {
return obj.Data
}
return obj.Data
}

func (i *ldbIterator) Object() (obj *pb.Object, err error) {
Expand All @@ -65,5 +97,5 @@ func (i *ldbIterator) Object() (obj *pb.Object, err error) {
}

func (i *ldbIterator) Namespace() string {
return i.namespace
return i.options.Namespace
}
2 changes: 1 addition & 1 deletion engines/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (db *LevelDBEngine) Iter(prefix []byte, options *opts.Options) (i iterator.
if len(prefix) > 0 {
slice = util.BytesPrefix(prefix)
}
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options.Namespace), nil
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options), nil
}

var nssep = []byte("::")
Expand Down
58 changes: 29 additions & 29 deletions engines/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"google.golang.org/protobuf/proto"
)

// a test set of key/value pairs used to evaluate iteration
// note because :: is the namespace separator in leveldb, we want to ensure that keys
// with colons are correctly iterated on.
var pairs = [][]string{
{"aa", "first"},
{"ab", "second"},
{"ba", "third"},
{"bb", "fourth"},
{"bc", "fifth"},
{"b::a", "third"},
{"b::b", "fourth"},
{"b::c", "fifth"},
{"ca", "sixth"},
{"cb", "seventh"},
}
Expand All @@ -44,11 +47,18 @@ func setupLevelDBEngine(t testing.TB) (_ *leveldb.LevelDBEngine, path string) {
os.RemoveAll(tempDir)
}
require.NoError(t, err)

// Add a cleanup function to ensure the fixture is deleted after tests
t.Cleanup(func() {
// Teardown after finishing the test
engine.Close()
os.RemoveAll(tempDir)
})

return engine, tempDir
}

// Creates an options.Options struct with namespace set and returns
// a pointer to it.
// Creates an options.Options struct with namespace set and returns a pointer to it.
func namespaceOpts(namespace string, t *testing.T) *options.Options {
opts, err := options.New(options.WithNamespace(namespace))
require.NoError(t, err)
Expand Down Expand Up @@ -78,18 +88,14 @@ func checkDelete(ldbStore engine.Store, opts *options.Options, key []byte, t *te
require.Empty(t, value)
}

func TestLeveldbEngine(t *testing.T) {
func TestLevelDBEngine(t *testing.T) {
// Setup a levelDB Engine.
ldbEngine, ldbPath := setupLevelDBEngine(t)
require.Equal(t, "leveldb", ldbEngine.Engine())

// Ensure the db was created.
require.DirExists(t, ldbPath)

// Teardown after finishing the test.
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()

// Use a constant key to ensure namespaces
// are working correctly.
key := []byte("foo")
Expand All @@ -111,12 +117,8 @@ func TestLeveldbEngine(t *testing.T) {
}
}

func TestLeveldbTransactions(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
func TestLevelDBTransactions(t *testing.T) {
ldbEngine, _ := setupLevelDBEngine(t)

// Use a constant key to ensure namespaces
// are working correctly.
Expand Down Expand Up @@ -155,20 +157,9 @@ func TestLeveldbTransactions(t *testing.T) {
}

func TestLevelDBIter(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
ldbEngine, _ := setupLevelDBEngine(t)

for _, namespace := range testNamespaces {
// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
// on :: and grabbing the first string, so it only
// grabs "namespace".
if namespace == "namespace::with::colons" {
continue
}
// Add data to the database to iterate over.
opts := namespaceOpts(namespace, t)

Expand Down Expand Up @@ -223,7 +214,16 @@ func addIterPairsToDB(ldbStore engine.Store, opts *options.Options, pairs [][]st
obj := &pb.Object{
Key: key,
Namespace: opts.Namespace,
Data: value,
Version: &pb.Version{
Pid: 1,
Version: 1,
Region: "testing",
Parent: nil,
Tombstone: false,
},
Region: "testing",
Owner: "testing",
Data: value,
}

data, err := proto.Marshal(obj)
Expand Down
Loading

0 comments on commit 396d314

Please sign in to comment.