Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: no command to fix corrupted data in versiondb #1685

Merged
merged 30 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

* [#1679](https://github.com/crypto-org-chain/cronos/pull/1679) Include no trace detail on insufficient balance fix.
* [#1685](https://github.com/crypto-org-chain/cronos/pull/1685) Add command to fix versiondb corrupted data.

### Improvements

Expand Down
4 changes: 4 additions & 0 deletions app/versiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (app *App) setupVersionDB(
for _, key := range keys {
exposedKeys = append(exposedKeys, key)
}

// see: https://github.com/crypto-org-chain/cronos/issues/1683
versionDB.SetSkipVersionZero(true)

app.CommitMultiStore().AddListeners(exposedKeys)

// register in app streaming manager
Expand Down
2 changes: 1 addition & 1 deletion gomod2nix.toml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
FixDataCmd(opts.DefaultStores),

Check warning on line 31 in versiondb/client/cmd.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/cmd.go#L31

Added line #L31 was not covered by tests
)
return cmd
}
59 changes: 59 additions & 0 deletions versiondb/client/fixdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package client

import (
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cobra"
)

const (
FlagDryRun = "dry-run"
FlagStore = "store-name"
)

func FixDataCmd(defaultStores []string) *cobra.Command {
yihuang marked this conversation as resolved.
Show resolved Hide resolved
cmd := &cobra.Command{
Use: "fixdata <dir>",
Args: cobra.ExactArgs(1),
Short: "Fix wrong data in versiondb, see: https://github.com/crypto-org-chain/cronos/issues/1683",
RunE: func(cmd *cobra.Command, args []string) error {
dir := args[0]
dryRun, err := cmd.Flags().GetBool(FlagDryRun)
if err != nil {
return err
}
stores, err := cmd.Flags().GetStringArray(FlagStore)
if err != nil {
return err
}
if len(stores) == 0 {
stores = defaultStores
}

Check warning on line 31 in versiondb/client/fixdata.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/fixdata.go#L14-L31

Added lines #L14 - L31 were not covered by tests

var (
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle
)

if dryRun {
db, cfHandle, err = tsrocksdb.OpenVersionDBForReadOnly(dir, false)
} else {
db, cfHandle, err = tsrocksdb.OpenVersionDB(dir)
}
if err != nil {
return err
}

Check warning on line 45 in versiondb/client/fixdata.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/fixdata.go#L33-L45

Added lines #L33 - L45 were not covered by tests

yihuang marked this conversation as resolved.
Show resolved Hide resolved
versionDB := tsrocksdb.NewStoreWithDB(db, cfHandle)
if err := versionDB.FixData(stores, dryRun); err != nil {
return err
}

Check warning on line 50 in versiondb/client/fixdata.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/fixdata.go#L47-L50

Added lines #L47 - L50 were not covered by tests

return nil

Check warning on line 52 in versiondb/client/fixdata.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/fixdata.go#L52

Added line #L52 was not covered by tests
},
}

cmd.Flags().Bool(FlagDryRun, false, "Dry run, do not write to the database, open the database in read-only mode.")
cmd.Flags().StringArray(FlagStore, []string{}, "Store names to fix, if not specified, all stores will be fixed.")
return cmd

Check warning on line 58 in versiondb/client/fixdata.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/fixdata.go#L56-L58

Added lines #L56 - L58 were not covered by tests
}
40 changes: 32 additions & 8 deletions versiondb/tsrocksdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsrocksdb

import (
"bytes"
"encoding/binary"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/linxGnu/grocksdb"
Expand All @@ -12,11 +13,14 @@ type rocksDBIterator struct {
prefix, start, end []byte
isReverse bool
isInvalid bool

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

var _ versiondb.Iterator = (*rocksDBIterator)(nil)

func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool) *rocksDBIterator {
func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool, skipVersionZero bool) *rocksDBIterator {
if isReverse {
if end == nil {
source.SeekToLast()
Expand All @@ -39,14 +43,18 @@ func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, is
source.Seek(start)
}
}
return &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
it := &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
skipVersionZero: skipVersionZero,
}

it.trySkipZeroVersion()
return it
}

// Domain implements Iterator.
Expand Down Expand Up @@ -120,6 +128,22 @@ func (itr rocksDBIterator) Next() {
} else {
itr.source.Next()
}

itr.trySkipZeroVersion()
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved

func (itr rocksDBIterator) timestamp() uint64 {
ts := itr.source.Timestamp()
defer ts.Free()
return binary.LittleEndian.Uint64(ts.Data())
}

func (itr rocksDBIterator) trySkipZeroVersion() {
if itr.skipVersionZero {
for itr.Valid() && itr.timestamp() == 0 {
itr.Next()
}
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved
}

// Error implements Iterator.
Expand Down
14 changes: 14 additions & 0 deletions versiondb/tsrocksdb/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@
return db, cfHandles[1], nil
}

// OpenVersionDBForReadOnly open versiondb in readonly mode
func OpenVersionDBForReadOnly(dir string, errorIfWalFileExists bool) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
db, cfHandles, err := grocksdb.OpenDbForReadOnlyColumnFamilies(
opts, dir, []string{"default", VersionDBCFName},
[]*grocksdb.Options{opts, NewVersionDBOpts(false)},
errorIfWalFileExists,
)
if err != nil {
return nil, nil, err
}
return db, cfHandles[1], nil

Check warning on line 78 in versiondb/tsrocksdb/opts.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/opts.go#L68-L78

Added lines #L68 - L78 were not covered by tests
}

// OpenVersionDBAndTrimHistory opens versiondb similar to `OpenVersionDB`,
// but it also trim the versions newer than target one, can be used for rollback.
func OpenVersionDBAndTrimHistory(dir string, version int64) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) {
Expand Down
134 changes: 122 additions & 12 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsrocksdb

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,9 @@
type Store struct {
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

func NewStore(dir string) (Store, error) {
Expand All @@ -58,6 +62,10 @@
}
}

func (s *Store) SetSkipVersionZero(skip bool) {
s.skipVersionZero = skip
}

func (s Store) SetLatestVersion(version int64) error {
var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
Expand Down Expand Up @@ -86,11 +94,23 @@
}

func (s Store) GetAtVersionSlice(storeKey string, key []byte, version *int64) (*grocksdb.Slice, error) {
return s.db.GetCF(
value, ts, err := s.db.GetCFWithTS(
newTSReadOptions(version),
s.cfHandle,
prependStoreKey(storeKey, key),
)
if err != nil {
return nil, err
}

Check warning on line 104 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L103-L104

Added lines #L103 - L104 were not covered by tests
defer ts.Free()

yihuang marked this conversation as resolved.
Show resolved Hide resolved
if value.Exists() && s.skipVersionZero {
if binary.LittleEndian.Uint64(ts.Data()) == 0 {
return grocksdb.NewSlice(nil, 0), nil
}
}

return value, err
}

// GetAtVersion implements VersionStore interface
Expand Down Expand Up @@ -128,28 +148,24 @@

// IteratorAtVersion implements VersionStore interface
func (s Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (versiondb.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, false), nil
return s.iteratorAtVersion(storeKey, start, end, version, false)
yihuang marked this conversation as resolved.
Show resolved Hide resolved
}

// ReverseIteratorAtVersion implements VersionStore interface
func (s Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (versiondb.Iterator, error) {
return s.iteratorAtVersion(storeKey, start, end, version, true)
}

func (s Store) iteratorAtVersion(storeKey string, start, end []byte, version *int64, reverse bool) (versiondb.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(storePrefix(storeKey), start, end)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, true), nil
return newRocksDBIterator(itr, prefix, start, end, reverse, s.skipVersionZero), nil
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved

// FeedChangeSet is used to migrate legacy change sets into versiondb
Expand Down Expand Up @@ -216,6 +232,100 @@
)
}

// FixData fixes wrong data written in versiondb due to rocksdb upgrade, the operation is idempotent.
// see: https://github.com/crypto-org-chain/cronos/issues/1683
// call this before `SetSkipVersionZero(true)`.
func (s Store) FixData(storeNames []string, dryRun bool) error {
for _, storeName := range storeNames {
if err := s.fixDataStore(storeName, dryRun); err != nil {
return err
}

Check warning on line 242 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L241-L242

Added lines #L241 - L242 were not covered by tests
}

return s.Flush()
}

// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again.
func (s Store) fixDataStore(storeName string, dryRun bool) error {
pairs, err := s.loadWrongData(storeName)
if err != nil {
return err
}

Check warning on line 253 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L252-L253

Added lines #L252 - L253 were not covered by tests

batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

prefix := storePrefix(storeName)
readOpts := grocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
for _, pair := range pairs {
realKey := cloneAppend(prefix, pair.Key)

readOpts.SetTimestamp(pair.Timestamp)
oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey)
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

Check warning on line 268 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L267-L268

Added lines #L267 - L268 were not covered by tests
yihuang marked this conversation as resolved.
Show resolved Hide resolved

clean := bytes.Equal(oldValue.Data(), pair.Value)
oldValue.Free()

if clean {
continue

Check warning on line 274 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L274

Added line #L274 was not covered by tests
}

if dryRun {
fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, pair.Key, pair.Timestamp)

Check warning on line 278 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L278

Added line #L278 was not covered by tests
} else {
batch.PutCFWithTS(s.cfHandle, realKey, pair.Timestamp, pair.Value)
}
}

if !dryRun {
return s.db.Write(defaultSyncWriteOpts, batch)
}

return nil

Check warning on line 288 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L288

Added line #L288 was not covered by tests
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved

type KVPairWithTS struct {
Key []byte
Value []byte
Timestamp []byte
}

func (s Store) loadWrongData(storeName string) ([]KVPairWithTS, error) {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
if err != nil {
return nil, err
}

Check warning on line 302 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L301-L302

Added lines #L301 - L302 were not covered by tests
defer iter.Close()

var pairs []KVPairWithTS
for ; iter.Valid(); iter.Next() {
if binary.LittleEndian.Uint64(iter.Timestamp()) != 0 {
// FIXME: https://github.com/crypto-org-chain/cronos/issues/1689
continue

Check warning on line 309 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L308-L309

Added lines #L308 - L309 were not covered by tests
}

key := iter.Key()
if len(key) < TimestampSize {
return nil, fmt.Errorf("invalid key length: %X, store: %s", key, storeName)
}

Check warning on line 315 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L314-L315

Added lines #L314 - L315 were not covered by tests

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
pairs = append(pairs, KVPairWithTS{
Key: key,
Value: iter.Value(),
Timestamp: ts,
})
}

return pairs, nil
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
Loading
Loading