Skip to content

Commit

Permalink
Merge pull request #6465 from onflow/leo/db-ops
Browse files Browse the repository at this point in the history
[Badger] Add universal database operations
  • Loading branch information
zhangchiqing authored Dec 12, 2024
2 parents 0310dfd + 24d65bc commit 541d744
Show file tree
Hide file tree
Showing 19 changed files with 1,898 additions and 2 deletions.
3 changes: 2 additions & 1 deletion cmd/bootstrap/utils/md5.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package utils

// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
import (
"crypto/md5" //nolint:gosec
// #nosec
"crypto/md5"
"io"
"os"
)
Expand Down
8 changes: 7 additions & 1 deletion storage/batch.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package storage

import "github.com/dgraph-io/badger/v2"
import (
"github.com/dgraph-io/badger/v2"
)

// Deprecated: Transaction is being deprecated as part of the transition from Badger to Pebble.
// Use Writer instead of Transaction for all new code.
type Transaction interface {
Set(key, val []byte) error
}

// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
// Deprecated: BatchStorage is being deprecated as part of the transition from Badger to Pebble.
// Use ReaderBatchWriter instead of BatchStorage for all new code.
type BatchStorage interface {
GetWriter() *badger.WriteBatch

Expand Down
90 changes: 90 additions & 0 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package badgerimpl

import (
"bytes"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

type badgerIterator struct {
iter *badger.Iterator
lowerBound []byte
upperBound []byte
hasUpperBound bool // whether there's an upper bound
}

var _ storage.Iterator = (*badgerIterator)(nil)

func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator {
options := badger.DefaultIteratorOptions
if ops.BadgerIterateKeyOnly {
options.PrefetchValues = false
}

tx := db.NewTransaction(false)
iter := tx.NewIterator(options)

lowerBound, upperBound, hasUpperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix)

return &badgerIterator{
iter: iter,
lowerBound: lowerBound,
upperBound: upperBound,
hasUpperBound: hasUpperBound,
}
}

// First seeks to the smallest key greater than or equal to the given key.
func (i *badgerIterator) First() bool {
i.iter.Seek(i.lowerBound)
return i.Valid()
}

// Valid returns whether the iterator is positioned at a valid key-value pair.
func (i *badgerIterator) Valid() bool {
// Note: we didn't specify the iteration range with the badger IteratorOptions,
// because the IterationOptions only allows us to specify a single prefix, whereas
// we need to specify a range of prefixes. So we have to manually check the bounds here.
// The First() method, which calls Seek(i.lowerBound), ensures the iteration starts from
// the lowerBound, and the upperbound is checked here by first checking if it's
// reaching the end of the iteration, then checking if the key is within the upperbound.

// check if it's reaching the end of the iteration
if !i.iter.Valid() {
return false
}

// if upper bound is nil, then there's no upper bound, so it's always valid
if !i.hasUpperBound {
return true
}

// check if the key is within the upperbound (exclusive)
key := i.iter.Item().Key()
// note: for the boundary case,
// upperBound is the exclusive upper bound, should not be included in the iteration,
// so if key == upperBound, it's invalid, should return false.
valid := bytes.Compare(key, i.upperBound) < 0
return valid
}

// Next advances the iterator to the next key-value pair.
func (i *badgerIterator) Next() {
i.iter.Next()
}

// IterItem returns the current key-value pair, or nil if done.
func (i *badgerIterator) IterItem() storage.IterItem {
return i.iter.Item()
}

var _ storage.IterItem = (*badger.Item)(nil)

// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak.
// No errors expected during normal operation
func (i *badgerIterator) Close() error {
i.iter.Close()
return nil
}
67 changes: 67 additions & 0 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package badgerimpl

import (
"bytes"
"errors"
"fmt"
"io"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/noop"
)

type dbReader struct {
db *badger.DB
}

// Get gets the value for the given key. It returns ErrNotFound if the DB
// does not contain the key.
// other errors are exceptions
//
// The caller should not modify the contents of the returned slice, but it is
// safe to modify the contents of the argument after Get returns. The
// returned slice will remain valid until the returned Closer is closed.
// when err == nil, the caller MUST call closer.Close() or a memory leak will occur.
func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
tx := b.db.NewTransaction(false)
defer tx.Discard()

item, err := tx.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, noop.Closer{}, storage.ErrNotFound
}
return nil, noop.Closer{}, irrecoverable.NewExceptionf("could not load data: %w", err)
}

value, err := item.ValueCopy(nil)
if err != nil {
return nil, noop.Closer{}, irrecoverable.NewExceptionf("could not load value: %w", err)
}

return value, noop.Closer{}, nil
}

// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive.
// Specifically, all keys that meet ANY of the following conditions are included in the iteration:
// - have a prefix equal to startPrefix OR
// - have a prefix equal to the endPrefix OR
// - have a prefix that is lexicographically between startPrefix and endPrefix
//
// it returns error if the startPrefix key is greater than the endPrefix key
// no errors are expected during normal operation
func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
if bytes.Compare(startPrefix, endPrefix) > 0 {
return nil, fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
}

return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil
}

// ToReader is a helper function to convert a *badger.DB to a Reader
func ToReader(db *badger.DB) storage.Reader {
return dbReader{db}
}
121 changes: 121 additions & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package badgerimpl

import (
"fmt"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
op "github.com/onflow/flow-go/storage/operation"
)

type ReaderBatchWriter struct {
globalReader storage.Reader
batch *badger.WriteBatch

callbacks op.Callbacks
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read un-committed writes written to ReaderBatchWriter.Writer until the write batch is committed.
// This reader may observe different values for the same key on subsequent reads.
func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b.globalReader
}

// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed.
// When we `Write` into the batch, that write operation is added to the pending batch, but not committed.
// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are.
// Note:
// - The writer cannot be used concurrently for writing.
func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}

// BadgerWriteBatch returns the badger write batch
func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch {
return b.batch
}

// AddCallback adds a callback to execute after the batch has been flush
// regardless the batch update is succeeded or failed.
// The error parameter is the error returned by the batch update.
func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks.AddCallback(callback)
}

// Commit flushes the batch to the database.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Flush()

b.callbacks.NotifyCallbacks(err)

return err
}

func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error {
batch := NewReaderBatchWriter(db)

err := fn(batch)
if err != nil {
// fn might use lock to ensure concurrent safety while reading and writing data
// and the lock is usually released by a callback.
// in other words, fn might hold a lock to be released by a callback,
// we need to notify the callback for the locks to be released before
// returning the error.
batch.callbacks.NotifyCallbacks(err)
return err
}

return batch.Commit()
}

func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter {
return &ReaderBatchWriter{
globalReader: ToReader(db),
batch: db.NewWriteBatch(),
}
}

var _ storage.Writer = (*ReaderBatchWriter)(nil)

// Set sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Set returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Set(key, value []byte) error {
return b.batch.Set(key, value)
}

// Delete deletes the value for the given key. Deletes are blind all will
// succeed even if the given key does not exist.
//
// It is safe to modify the contents of the arguments after Delete returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key)
}

// DeleteByRange removes all keys with a prefix that falls within the
// range [start, end], both inclusive.
// It returns error if endPrefix < startPrefix
// no other errors are expected during normal operation
func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
err := operation.Iterate(startPrefix, endPrefix, func(key []byte) error {
err := b.batch.Delete(key)
if err != nil {
return fmt.Errorf("could not add key to delete batch (%v): %w", key, err)
}
return nil
})(globalReader)

if err != nil {
return fmt.Errorf("could not find keys by range to be deleted: %w", err)
}
return nil
}
24 changes: 24 additions & 0 deletions storage/operation/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package operation

import "sync"

type Callbacks struct {
sync.RWMutex // protect callbacks
callbacks []func(error)
}

func (b *Callbacks) AddCallback(callback func(error)) {
b.Lock()
defer b.Unlock()

b.callbacks = append(b.callbacks, callback)
}

func (b *Callbacks) NotifyCallbacks(err error) {
b.RLock()
defer b.RUnlock()

for _, callback := range b.callbacks {
callback(err)
}
}
34 changes: 34 additions & 0 deletions storage/operation/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package operation

import (
"encoding/binary"
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage.
func EncodeKeyPart(v interface{}) []byte {
switch i := v.(type) {
case uint8:
return []byte{i}
case uint32:
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
case uint64:
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
return b
case string:
return []byte(i)
case flow.Role:
return []byte{byte(i)}
case flow.Identifier:
return i[:]
case flow.ChainID:
return []byte(i)
default:
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
}
}
Loading

0 comments on commit 541d744

Please sign in to comment.