Skip to content

Commit

Permalink
Allow Engine-Specific Options in Open (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort authored Jan 10, 2022
1 parent ce31ee3 commit cc53387
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 92 deletions.
79 changes: 68 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,73 @@
package config

import "time"
import (
ldbopt "github.com/syndtr/goleveldb/leveldb/opt"
)

// ReplicaConfig specifies the information needed for a Replica and Version manager to
// maintain global object versioning and provenance.
// TODO: this configuration is pulled from a service context not a library context.
// DefaultConfig is used if the user does not specify a configuration
var DefaultConfig = Config{
Versions: ReplicaConfig{
PID: 1,
Region: "local",
Name: "localhost",
},
}

// New creates a configuration with the required options and can also be used to specify
// optional configuration e.g. for engine-specific operations.
func New(options ...Option) (_ Config, err error) {
// Create the default configuration in editable mode
conf := &Config{
Versions: ReplicaConfig{
PID: DefaultConfig.Versions.PID,
Region: DefaultConfig.Versions.Region,
Name: DefaultConfig.Versions.Name,
},
}

// Apply all options to the configuration
for _, opt := range options {
if err = opt(conf); err != nil {
return Config{}, err
}
}

// Return the value of the configuration
return *conf, nil
}

// Config specifies the options necessary to open a Honu database.
type Config struct {
Versions ReplicaConfig
LDBOptions *ldbopt.Options
}

// ReplicaConfig specifies the information needed for the Version manager to maintain
// global object versioning and provenance. Honu is intended to support data replication
// by versioning using Lamport scalars. These conflict-free version numbers are closely
// tied to a replica's configuration (where a replica is a process that performs data
// replication using Honu), e.g. the PID is the process ID of a running replica, the
// region is where the replica is running, and the name is usually the hostname of the
// replica.
type ReplicaConfig struct {
Enabled bool `split_words:"true" default:"true"`
BindAddr string `split_words:"true" default:":4435"`
PID uint64 `split_words:"true" required:"false"`
Region string `split_words:"true" required:"false"`
Name string `split_words:"true" required:"false"`
GossipInterval time.Duration `split_words:"true" default:"1m"`
GossipSigma time.Duration `split_words:"true" default:"5s"`
PID uint64 `split_words:"true" required:"false"`
Region string `split_words:"true" required:"false"`
Name string `split_words:"true" required:"false"`
}

// Option modifies a configuration to add optional configuration items.
type Option func(*Config) error

func WithReplica(conf ReplicaConfig) Option {
return func(cfg *Config) error {
cfg.Versions = conf
return nil
}
}

func WithLevelDB(opt *ldbopt.Options) Option {
return func(cfg *Config) error {
cfg.LDBOptions = opt
return nil
}
}
33 changes: 33 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package config_test

import (
"testing"

"github.com/rotationalio/honu/config"
"github.com/stretchr/testify/require"
ldbopt "github.com/syndtr/goleveldb/leveldb/opt"
)

func TestConfig(t *testing.T) {
// Test Default Config
conf, err := config.New()
require.NoError(t, err)
require.Equal(t, config.DefaultConfig, conf)
require.NotZero(t, conf.Versions.PID)
require.NotEmpty(t, conf.Versions.Region)

// Test WithVersions
conf, err = config.New(config.WithReplica(config.ReplicaConfig{8, "us-antarctic-23", "research"}))
require.NoError(t, err)
require.NotEmpty(t, conf.Versions)
require.Equal(t, uint64(8), conf.Versions.PID)
require.Equal(t, "us-antarctic-23", conf.Versions.Region)
require.Equal(t, "research", conf.Versions.Name)

// Test WithLevelDB Options
conf, err = config.New(config.WithLevelDB(&ldbopt.Options{Strict: ldbopt.StrictJournal}))
require.NoError(t, err)
require.Equal(t, config.DefaultConfig.Versions, conf.Versions)
require.NotNil(t, conf.LDBOptions)
require.Equal(t, conf.LDBOptions.Strict, ldbopt.StrictJournal)
}
2 changes: 1 addition & 1 deletion engines/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
engine "github.com/rotationalio/honu/engines"
)

func Open(conf config.ReplicaConfig) (*BadgerEngine, error) {
func Open(conf config.Config) (*BadgerEngine, error) {
return &BadgerEngine{}, errors.New("not implemented yet")
}

Expand Down
4 changes: 2 additions & 2 deletions engines/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

// Open a leveldb engine as the backend to the Honu database.
func Open(path string, conf config.ReplicaConfig) (_ *LevelDBEngine, err error) {
func Open(path string, conf config.Config) (_ *LevelDBEngine, err error) {
engine := &LevelDBEngine{}
if engine.ldb, err = leveldb.OpenFile(path, nil); err != nil {
if engine.ldb, err = leveldb.OpenFile(path, conf.LDBOptions); err != nil {
return nil, err
}
return engine, nil
Expand Down
19 changes: 9 additions & 10 deletions engines/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package leveldb_test

import (
"fmt"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -34,18 +33,18 @@ var testNamespaces = []string{
"namespace::with::colons",
}

// Returns a LevelDBEngine and the path were it was created.
func setupLeveldbEngine(t testing.TB) (_ *leveldb.LevelDBEngine, path string) {
// Returns a LevelDBEngine and the path where it was created.
func setupLevelDBEngine(t testing.TB) (_ *leveldb.LevelDBEngine, path string) {
tempDir, err := ioutil.TempDir("", "leveldb-*")
ldbPath := fmt.Sprintf("leveldb:///%s", tempDir)
require.NoError(t, err)
conf := config.ReplicaConfig{}
engine, err := leveldb.Open(ldbPath, conf)

conf, _ := config.New()
engine, err := leveldb.Open(tempDir, conf)
if err != nil {
os.RemoveAll(tempDir)
}
require.NoError(t, err)
return engine, ldbPath
return engine, tempDir
}

// Creates an options.Options struct with namespace set and returns
Expand Down Expand Up @@ -81,7 +80,7 @@ func checkDelete(ldbStore engine.Store, opts *options.Options, key []byte, t *te

func TestLeveldbEngine(t *testing.T) {
// Setup a levelDB Engine.
ldbEngine, ldbPath := setupLeveldbEngine(t)
ldbEngine, ldbPath := setupLevelDBEngine(t)
require.Equal(t, "leveldb", ldbEngine.Engine())

// Ensure the db was created.
Expand Down Expand Up @@ -113,7 +112,7 @@ func TestLeveldbEngine(t *testing.T) {
}

func TestLeveldbTransactions(t *testing.T) {
ldbEngine, ldbPath := setupLeveldbEngine(t)
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
Expand Down Expand Up @@ -156,7 +155,7 @@ func TestLeveldbTransactions(t *testing.T) {
}

func TestLevelDBIter(t *testing.T) {
ldbEngine, ldbPath := setupLeveldbEngine(t)
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
Expand Down
8 changes: 4 additions & 4 deletions engines/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type PebbleEngine struct {
}

// TODO: Allow Passing Pebble Options
func Open(path string, conf config.ReplicaConfig) (_ *PebbleEngine, err error) {
func Open(path string, conf config.Config) (_ *PebbleEngine, err error) {
engine := &PebbleEngine{}
if engine.pebble, err = pebble.Open(path, nil); err != nil {
return nil, err
Expand All @@ -38,7 +38,7 @@ func (db *PebbleEngine) Begin(readonly bool) (engine.Transaction, error) {
}

// Get the latest version of the object stored by the key.
func (db *PebbleEngine) Get(key []byte, options ...opts.SetOptions) (value []byte, err error) {
func (db *PebbleEngine) Get(key []byte, options ...opts.Option) (value []byte, err error) {
value, closer, err := db.pebble.Get(key)
if err != nil && errors.Is(err, pebble.ErrNotFound) {
return value, engine.ErrNotFound
Expand All @@ -50,7 +50,7 @@ func (db *PebbleEngine) Get(key []byte, options ...opts.SetOptions) (value []byt
}

// Put a new value to the specified key and update the version.
func (db *PebbleEngine) Put(key, value []byte, options ...opts.SetOptions) error {
func (db *PebbleEngine) Put(key, value []byte, options ...opts.Option) error {
var cfg *opts.Options
cfg.PebbleWrite = nil
for _, setOption := range options {
Expand All @@ -62,7 +62,7 @@ func (db *PebbleEngine) Put(key, value []byte, options ...opts.SetOptions) error
}

// Delete the object represented by the key, creating a tombstone object.
func (db *PebbleEngine) Delete(key []byte, options ...opts.SetOptions) error {
func (db *PebbleEngine) Delete(key []byte, options ...opts.Option) error {
var cfg *opts.Options
cfg.PebbleWrite = nil
for _, setOption := range options {
Expand Down
28 changes: 14 additions & 14 deletions honu.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,25 @@ type DB struct {
// Open a replicated embedded database with the specified URI. Database URIs should
// specify protocol:///relative/path/to/db for embedded databases. For absolute paths,
// specify protocol:////absolute/path/to/db.
func Open(uri string, conf config.ReplicaConfig) (db *DB, err error) {
func Open(uri string, options ...config.Option) (db *DB, err error) {
// Create a configuration from the options passed in.
var conf config.Config
if conf, err = config.New(options...); err != nil {
return nil, err
}

var dsn *DSN
if dsn, err = ParseDSN(uri); err != nil {
return nil, err
}

db = &DB{}
if db.vm, err = NewVersionManager(conf); err != nil {
if db.vm, err = NewVersionManager(conf.Versions); err != nil {
return nil, err
}

switch dsn.Scheme {
case "leveldb":
// TODO: allow leveldb options to be passed to OpenFile
// TODO: multiple leveldb databases for different namespaces
if db.engine, err = leveldb.Open(dsn.Path, conf); err != nil {
return nil, err
Expand Down Expand Up @@ -70,7 +75,7 @@ func (db *DB) Close() error {
// Object returns metadata associated with the latest object stored by the key.
// Object is the Get function to use if you want to fetch tombstones, otherwise use Get
// which will return a not found error.
func (db *DB) Object(key []byte, options ...opts.SetOptions) (_ *pb.Object, err error) {
func (db *DB) Object(key []byte, options ...opts.Option) (_ *pb.Object, err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(true); err != nil {
return nil, err
Expand Down Expand Up @@ -100,7 +105,7 @@ func (db *DB) Object(key []byte, options ...opts.SetOptions) (_ *pb.Object, err
}

// Get the latest version of the object stored by the key.
func (db *DB) Get(key []byte, options ...opts.SetOptions) (value []byte, err error) {
func (db *DB) Get(key []byte, options ...opts.Option) (value []byte, err error) {
var obj *pb.Object
if obj, err = db.Object(key, options...); err != nil {
return nil, err
Expand All @@ -120,7 +125,7 @@ func (db *DB) Get(key []byte, options ...opts.SetOptions) (value []byte, err err
// Update an object directly in the database without modifying its version information.
// Update is to Put as Object is to Get - use Update when manually modifying the data
// store, for example during replication, but not for normal DB operations.
func (db *DB) Update(obj *pb.Object, options ...opts.SetOptions) (err error) {
func (db *DB) Update(obj *pb.Object, options ...opts.Option) (err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return err
Expand Down Expand Up @@ -153,7 +158,7 @@ func (db *DB) Update(obj *pb.Object, options ...opts.SetOptions) (err error) {
}

// Put a new value to the specified key and update the version.
func (db *DB) Put(key, value []byte, options ...opts.SetOptions) (_ *pb.Object, err error) {
func (db *DB) Put(key, value []byte, options ...opts.Option) (_ *pb.Object, err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return nil, err
Expand Down Expand Up @@ -204,7 +209,7 @@ func (db *DB) Put(key, value []byte, options ...opts.SetOptions) (_ *pb.Object,
}

// Delete the object represented by the key, creating a tombstone object.
func (db *DB) Delete(key []byte, options ...opts.SetOptions) (_ *pb.Object, err error) {
func (db *DB) Delete(key []byte, options ...opts.Option) (_ *pb.Object, err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return nil, err
Expand All @@ -217,11 +222,6 @@ func (db *DB) Delete(key []byte, options ...opts.SetOptions) (_ *pb.Object, err
return nil, err
}

// TODO: implement destroy
if cfg.Destroy {
return nil, errors.New("destroy is not implemented yet")
}

var data []byte
if data, err = tx.Get(key, cfg); err != nil {
if errors.Is(err, engine.ErrNotFound) {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (db *DB) Delete(key []byte, options ...opts.SetOptions) (_ *pb.Object, err

// Iter over a subset of keys specified by the prefix.
// TODO: provide better mechanisms for iteration.
func (db *DB) Iter(prefix []byte, options ...opts.SetOptions) (i iterator.Iterator, err error) {
func (db *DB) Iter(prefix []byte, options ...opts.Option) (i iterator.Iterator, err error) {
// Collect the options
var cfg *opts.Options
if cfg, err = opts.New(options...); err != nil {
Expand Down
13 changes: 1 addition & 12 deletions honu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/rotationalio/honu"
"github.com/rotationalio/honu/config"
Expand Down Expand Up @@ -39,17 +38,7 @@ func setupHonuDB(t testing.TB) (db *honu.DB, tmpDir string) {

// Open a Honu leveldb database with default configuration
uri := fmt.Sprintf("leveldb:///%s", tmpDir)
conf := config.ReplicaConfig{
Enabled: true,
BindAddr: ":443",
PID: 8,
Region: "us-southwest-16",
Name: "testing",
GossipInterval: 1 * time.Minute,
GossipSigma: 15 * time.Second,
}

db, err = honu.Open(uri, conf)
db, err = honu.Open(uri, config.WithReplica(config.ReplicaConfig{PID: 8, Region: "us-southwest-16", Name: "testing"}))
require.NoError(t, err)
if err != nil && tmpDir != "" {
fmt.Println(tmpDir)
Expand Down
Loading

0 comments on commit cc53387

Please sign in to comment.