Skip to content

Commit

Permalink
Skip, stomp, and linear version history tracking in Update (#19)
Browse files Browse the repository at this point in the history
Adds an UpdateType return value to the db.Update method so callers can
track what happens to the underlying version history. The types are
directly connected to new Version comparison methods of similar names,
which will hopefully make it easier for us to reason about whats
happening in the version history and in anti-entropy in general.
  • Loading branch information
bbengfort authored Jan 26, 2022
1 parent 0d24e25 commit b3f3099
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 57 deletions.
85 changes: 71 additions & 14 deletions honu.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,71 @@ func (db *DB) Get(key []byte, options ...opts.Option) (value []byte, err error)

// Return the wrapped data
return obj.Data, nil
}

// UpdateType is an intermediate solution to tracking what's happening to the version
// history when direct modifications are applied to the database.
// NOTE: this data type is subject to change in later versions and should be treated
// as a prototype only in production code.
type UpdateType uint8

const (
UpdateNoChange UpdateType = iota // No change occurred (nothing was written to disk)
UpdateForced // The update was forced, so the previous version was not checked
UpdateLinear // The previous version is the parent of the updating version
UpdateStomp // The previous version is concurrent but has a lower precedence than the updating version
UpdateSkip // The previous version is later but is not concurrent nor linear from the updating version

)

func (u UpdateType) String() string {
switch u {
case UpdateNoChange:
return "unchanged"
case UpdateForced:
return "forced"
case UpdateLinear:
return "linear"
case UpdateStomp:
return "stomped"
case UpdateSkip:
return "skipped"
default:
return "unknown"
}
}

// Update an object directly in the database without modifying its version information.
// Update is to Put as Object is to Get - use Update when manually modifying the data
// store, for example during replication, but not for normal DB operations.
func (db *DB) Update(obj *pb.Object, options ...opts.Option) (err error) {
// store, for example during replication, but not for normal DB operations. Update also
// returns the type of update that ocurred, relative to the previous version.
func (db *DB) Update(obj *pb.Object, options ...opts.Option) (update UpdateType, err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return err
return UpdateNoChange, err
}
defer tx.Finish()

// Collect the options
var cfg *opts.Options
if cfg, err = opts.New(options...); err != nil {
return err
return UpdateNoChange, err
}

// If the default namespace is specified use the object's namespace to ensure that
// if the user did not supply Namespace, Update still works. If the object was
// already in the default namespace, this should not cause a change to happen. There
// is an edge case where the user supplies options.WithNamespace("default") and an
// object that is not in the default namespace and the user option will be ignored
// in favor of the object's original namespace; but this is an unlikely case.
if cfg.Namespace == opts.NamespaceDefault {
cfg.Namespace = obj.Namespace
}

if !cfg.Force {
// Check the namespace and that it matches the object
if cfg.Namespace == opts.NamespaceDefault {
cfg.Namespace = obj.Namespace
} else if cfg.Namespace != obj.Namespace {
return errors.New("options namespace does not match object namespace")
if cfg.Namespace != obj.Namespace {
return UpdateNoChange, errors.New("options namespace does not match object namespace")
}

// Check that the version is later than the version being written to disk
Expand All @@ -153,29 +193,46 @@ func (db *DB) Update(obj *pb.Object, options ...opts.Option) (err error) {
)
if prevData, err = tx.Get(obj.Key, cfg); err != nil {
if !errors.Is(err, engine.ErrNotFound) {
return fmt.Errorf("could not check previous version: %v", err)
return UpdateNoChange, fmt.Errorf("could not check previous version: %v", err)
}
} else {
if err = proto.Unmarshal(prevData, prev); err != nil {
return fmt.Errorf("could not unmarshal previous version: %v", err)
return UpdateNoChange, fmt.Errorf("could not unmarshal previous version: %v", err)
}
}

if !obj.Version.IsLater(prev.Version) {
return fmt.Errorf("cannot update object, it is not a later version then the current object")
return UpdateNoChange, fmt.Errorf("cannot update object, it is not a later version then the current object")
}

// Determine the update type based on the previous version
// NOTE: all update conditions imply the current version is later than previous
switch {
case obj.Version.Stomps(prev.Version):
update = UpdateStomp
case obj.Version.Skips(prev.Version):
update = UpdateSkip
case obj.Version.LinearFrom(prev.Version):
update = UpdateLinear
default:
return UpdateNoChange, fmt.Errorf("cannot determine update relationship in the version history")
}

} else {
// Report that the update was forced
update = UpdateForced
}

// Put the version directly to disk
var data []byte
if data, err = proto.Marshal(obj); err != nil {
return err
return UpdateNoChange, err
}

if err = tx.Put(obj.Key, data, cfg); err != nil {
return err
return UpdateNoChange, err
}
return nil
return update, nil
}

// Put a new value to the specified key and update the version.
Expand Down
201 changes: 169 additions & 32 deletions honu_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package honu_test

import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"

"github.com/rotationalio/honu"
"github.com/rotationalio/honu/config"
"github.com/rotationalio/honu/object"
"github.com/rotationalio/honu/options"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -108,46 +111,16 @@ func TestLevelDBInteractions(t *testing.T) {
require.Equal(t, uint64(3), obj.Version.Version)
require.False(t, obj.Tombstone())

// Attempt to directly update the object in the database
// Attempt to directly update the object in the database with a later version
obj.Data = []byte("directly updated")
obj.Owner = "me"
obj.Version.Parent = nil
obj.Version.Version = 42
obj.Version.Pid = 93
obj.Version.Region = "here"
obj.Version.Tombstone = false
require.NoError(t, db.Update(obj))

obj, err = db.Object(key, options.WithNamespace(namespace))
_, err = db.Update(obj)
require.NoError(t, err)
require.Equal(t, uint64(42), obj.Version.Version)
require.Equal(t, uint64(93), obj.Version.Pid)
require.Equal(t, "me", obj.Owner)
require.Equal(t, "here", obj.Version.Region)

// Update with same namespace option should not error.
obj.Version.Version = 43
require.NoError(t, db.Update(obj, options.WithNamespace(namespace)))

// Update with wrong namespace should error
require.Error(t, db.Update(obj, options.WithNamespace("this is not the right thing")))

// Update with wrong namespace but with force should not error.
require.NoError(t, db.Update(obj, options.WithNamespace("trashcan"), options.WithForce()))

// Update with the same version should error.
require.Error(t, db.Update(obj, options.WithNamespace(namespace)))

// Update with an earlier version should error
obj.Version.Version = 7
require.Error(t, db.Update(obj, options.WithNamespace(namespace)))

// Update with an earlier version with force should not error.
require.NoError(t, db.Update(obj, options.WithNamespace(namespace), options.WithForce()))

// Update an object that does not exist should not error.
obj.Key = []byte("secretobjinvisible")
require.NoError(t, db.Update(obj, options.WithNamespace(namespace)))

// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
Expand Down Expand Up @@ -189,3 +162,167 @@ func TestLevelDBInteractions(t *testing.T) {
iter.Release()
}
}

func TestUpdate(t *testing.T) {
// Create a test database to attempt to update
db, tmpDir := setupHonuDB(t)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()

// Create a random object in the database to start update tests on
key := randomData(32)
namespace := "yeti"
root, err := db.Put(key, randomData(96), options.WithNamespace(namespace))
require.NoError(t, err, "could not put random data")

// Generate new1 - a linear object from root as though it were from a different replica
new1 := &object.Object{
Key: key,
Namespace: namespace,
Version: &object.Version{
Pid: 113,
Version: 2,
Parent: root.Version,
},
Region: "the-void",
Owner: root.Owner,
Data: randomData(112),
}

// Should be able to update with no namespace option
update, err := db.Update(new1)
require.NoError(t, err, "could not update db with new1")
require.Equal(t, honu.UpdateLinear, update, "expected new1 update to be linear")
requireObjectEqual(t, db, new1, key, namespace)

// Should not be be able to update with the same version twice, since it is now no
// longer later than previous version (it is the equal version on disk).
update, err = db.Update(new1)
require.EqualError(t, err, "cannot update object, it is not a later version then the current object")
require.Equal(t, honu.UpdateNoChange, update)

// Should be able to force the update to apply the same object back to disk.
update, err = db.Update(new1, options.WithForce())
require.NoError(t, err, "could not force update with new1")
require.Equal(t, honu.UpdateForced, update)

// Generate new2 - an object stomping new1 as though it were from a different replica
new2 := &object.Object{
Key: key,
Namespace: namespace,
Version: &object.Version{
Pid: 42,
Version: 2,
Parent: root.Version,
},
Region: "the-other-void",
Owner: root.Owner,
Data: randomData(112),
}

// Update with the wrong namespace should error
update, err = db.Update(new2, options.WithNamespace("this is not the right namespace for sure"))
require.EqualError(t, err, "options namespace does not match object namespace")
require.Equal(t, honu.UpdateNoChange, update)
requireObjectEqual(t, db, new1, key, namespace)

// Update with the wrong namespace but with force should not error and create a new object
// NOTE: this is kind of a wild force since now the object has the wrong namespace metadata.
update, err = db.Update(new2, options.WithNamespace("trashcan"), options.WithForce())
require.NoError(t, err)
require.Equal(t, honu.UpdateForced, update)
requireObjectEqual(t, db, new1, key, namespace)
requireObjectEqual(t, db, new2, key, "trashcan")

// Update with same namespace option should not error.
update, err = db.Update(new2, options.WithNamespace(namespace))
require.NoError(t, err, "could not update new2")
require.Equal(t, honu.UpdateStomp, update)
requireObjectEqual(t, db, new2, key, namespace)

// Generate new3 - an object skipping new2 as though it were from the same replica
new3 := &object.Object{
Key: key,
Namespace: namespace,
Version: &object.Version{
Pid: 42,
Version: 12,
Parent: root.Version,
},
Region: "the-other-void",
Owner: root.Owner,
Data: randomData(112),
}

// Ensure UpdateSkip is returned
update, err = db.Update(new3)
require.NoError(t, err, "could not update new3")
require.Equal(t, honu.UpdateSkip, update)
requireObjectEqual(t, db, new3, key, namespace)

// Update with an earlier version should error
update, err = db.Update(new1)
require.EqualError(t, err, "cannot update object, it is not a later version then the current object")
require.Equal(t, honu.UpdateNoChange, update)
requireObjectEqual(t, db, new3, key, namespace)

// Should be able to force the update to apply the earlier object back to disk
update, err = db.Update(new1, options.WithForce())
require.NoError(t, err, "could not force update with new1")
require.Equal(t, honu.UpdateForced, update)
requireObjectEqual(t, db, new1, key, namespace)

// Update an object that does not exist should not error.
stranger := &object.Object{
Key: randomData(18),
Namespace: "default",
Version: &object.Version{
Pid: 1,
Version: 1,
Parent: nil,
},
Region: "the-void",
Owner: "me",
Data: randomData(8),
}

update, err = db.Update(stranger)
require.NoError(t, err)
require.Equal(t, honu.UpdateLinear, update)
}

// Helper assertion function to check to make sure an object matches what is in the database
func requireObjectEqual(t *testing.T, db *honu.DB, expected *object.Object, key []byte, namespace string) {
actual, err := db.Object(key, options.WithNamespace(namespace))
require.NoError(t, err, "could not fetch expected object from the database")

// NOTE: we cannot do a require.Equal(t, expected, actual) because the test will hang
// it's not clear if there is a recursive loop with version comparisons or some other
// deep equality is causing the problem. Instead we directly compare the data.
require.True(t, bytes.Equal(expected.Key, actual.Key), "key is not equal")
require.Equal(t, expected.Namespace, actual.Namespace, "namespace not equal")
require.Equal(t, expected.Region, actual.Region, "region not equal")
require.Equal(t, expected.Owner, actual.Owner, "owner not equal")
require.True(t, expected.Version.Equal(actual.Version), "versions not equal")
require.Equal(t, expected.Version.Region, actual.Version.Region, "version region not equal")
require.Equal(t, expected.Version.Tombstone, actual.Version.Tombstone, "version tombstone not the same")
if expected.Version.Parent != nil {
require.True(t, expected.Version.Parent.Equal(actual.Version.Parent), "parents not equal")
require.Equal(t, expected.Version.Parent.Region, actual.Version.Parent.Region, "parent regions not equal")
require.Equal(t, expected.Version.Parent.Tombstone, actual.Version.Parent.Tombstone, "parent tombstone not the same")
} else {
require.Nil(t, actual.Version.Parent, "expected parent is nil")
}
require.True(t, bytes.Equal(expected.Data, actual.Data), "value is not equal")
}

// Helper function to generate random data
func randomData(len int) []byte {
data := make([]byte, len)
if _, err := rand.Read(data); err != nil {
panic(err)
}
return data
}
Loading

0 comments on commit b3f3099

Please sign in to comment.