From c22c7d43689a74432a99c939668c17bffdec12b5 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 11 Nov 2024 10:27:12 +0800 Subject: [PATCH] Problem: no command to fix corrupted data in versiondb (backport: #1685) Closes: #1683 Solution: - add fix command to fix corrupted data in versiondb rename support SkipVersionZero support SkipVersionZero cleanup --- CHANGELOG.md | 1 + app/versiondb.go | 16 ++++-- versiondb/tsrocksdb/iterator.go | 44 +++++++++++--- versiondb/tsrocksdb/opts.go | 13 +++++ versiondb/tsrocksdb/store.go | 95 +++++++++++++++++++++++++++---- versiondb/tsrocksdb/store_test.go | 87 ++++++++++++++++++++++++++++ 6 files changed, 232 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d36d777951..77ec2a3827 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [#1670](https://github.com/crypto-org-chain/cronos/pull/1670) Fix state overwrite in debug trace APIs. * [#1679](https://github.com/crypto-org-chain/cronos/pull/1679) Include no trace detail on insufficient balance fix. +* [#1685](https://github.com/crypto-org-chain/cronos/pull/1685) Fix versiondb corrupted data on startup. *Oct 14, 2024* diff --git a/app/versiondb.go b/app/versiondb.go index 10b0ad76ad..ccc7f5d0b6 100644 --- a/app/versiondb.go +++ b/app/versiondb.go @@ -23,10 +23,6 @@ func (app *App) setupVersionDB( if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { return nil, err } - versionDB, err := tsrocksdb.NewStore(dataDir) - if err != nil { - return nil, err - } // default to exposing all exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys)) @@ -34,6 +30,18 @@ func (app *App) setupVersionDB( exposeStoreKeys = append(exposeStoreKeys, storeKey) } + versionDB, err := tsrocksdb.NewStore(dataDir) + if err != nil { + return nil, err + } + + // see: https://github.com/crypto-org-chain/cronos/issues/1683 + if err := versionDB.FixData(exposedKeys); err != nil { + return nil, err + } + + versionDB.SetSkipVersionZero(true) + service := versiondb.NewStreamingService(versionDB, exposeStoreKeys) app.SetStreamingService(service) diff --git a/versiondb/tsrocksdb/iterator.go b/versiondb/tsrocksdb/iterator.go index f4c4b3c777..f2a5f5c5b1 100644 --- a/versiondb/tsrocksdb/iterator.go +++ b/versiondb/tsrocksdb/iterator.go @@ -2,6 +2,7 @@ package tsrocksdb import ( "bytes" + "encoding/binary" "github.com/cosmos/cosmos-sdk/store/types" "github.com/linxGnu/grocksdb" @@ -12,11 +13,14 @@ type rocksDBIterator struct { prefix, start, end []byte isReverse bool isInvalid bool + + // see: https://github.com/crypto-org-chain/cronos/issues/1683 + skipVersionZero bool } var _ types.Iterator = (*rocksDBIterator)(nil) -func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool) *rocksDBIterator { +func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool, skipVersionZero bool) *rocksDBIterator { if isReverse { if end == nil { source.SeekToLast() @@ -39,14 +43,18 @@ func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, is source.Seek(start) } } - return &rocksDBIterator{ - source: source, - prefix: prefix, - start: start, - end: end, - isReverse: isReverse, - isInvalid: false, + it := &rocksDBIterator{ + source: source, + prefix: prefix, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + skipVersionZero: skipVersionZero, } + + it.trySkipNonZeroVersion() + return it } // Domain implements Iterator. @@ -114,6 +122,22 @@ func (itr rocksDBIterator) Next() { } else { itr.source.Next() } + + itr.trySkipNonZeroVersion() +} + +func (itr rocksDBIterator) timestamp() uint64 { + ts := itr.source.Timestamp() + defer ts.Free() + return binary.LittleEndian.Uint64(ts.Data()) +} + +func (itr rocksDBIterator) trySkipNonZeroVersion() { + if itr.skipVersionZero { + for itr.Valid() && itr.timestamp() == 0 { + itr.Next() + } + } } // Error implements Iterator. @@ -137,6 +161,10 @@ func (itr *rocksDBIterator) assertIsValid() { // This function can be applied on *Slice returned from Key() and Value() // of an Iterator, because they are marked as freed. func moveSliceToBytes(s *grocksdb.Slice) []byte { + if s == nil { + return nil + } + defer s.Free() if !s.Exists() { return nil diff --git a/versiondb/tsrocksdb/opts.go b/versiondb/tsrocksdb/opts.go index 604c203b72..fb6eca3c24 100644 --- a/versiondb/tsrocksdb/opts.go +++ b/versiondb/tsrocksdb/opts.go @@ -64,6 +64,19 @@ func OpenVersionDB(dir string) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, erro return db, cfHandles[1], nil } +// OpenVersionDBForReadOnly opens versiondb for read-only. +func OpenVersionDBForReadOnly(dir string, errorIfWalFileExists bool) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) { + opts := grocksdb.NewDefaultOptions() + db, cfHandles, err := grocksdb.OpenDbForReadOnlyColumnFamilies( + opts, dir, []string{"default", VersionDBCFName}, + []*grocksdb.Options{opts, NewVersionDBOpts(false)}, errorIfWalFileExists, + ) + if err != nil { + return nil, nil, err + } + return db, cfHandles[1], nil +} + // OpenVersionDBAndTrimHistory opens versiondb similar to `OpenVersionDB`, // but it also trim the versions newer than target one, can be used for rollback. func OpenVersionDBAndTrimHistory(dir string, version int64) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) { diff --git a/versiondb/tsrocksdb/store.go b/versiondb/tsrocksdb/store.go index 7d4176c5d0..3ff4b2aaaa 100644 --- a/versiondb/tsrocksdb/store.go +++ b/versiondb/tsrocksdb/store.go @@ -1,6 +1,7 @@ package tsrocksdb import ( + "bytes" "encoding/binary" "errors" "fmt" @@ -38,6 +39,9 @@ func init() { type Store struct { db *grocksdb.DB cfHandle *grocksdb.ColumnFamilyHandle + + // see: https://github.com/crypto-org-chain/cronos/issues/1683 + skipVersionZero bool } func NewStore(dir string) (Store, error) { @@ -58,6 +62,10 @@ func NewStoreWithDB(db *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) Stor } } +func (s *Store) SetSkipVersionZero(skip bool) { + s.skipVersionZero = skip +} + func (s Store) SetLatestVersion(version int64) error { var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) @@ -86,11 +94,23 @@ func (s Store) PutAtVersion(version int64, changeSet []types.StoreKVPair) error } func (s Store) GetAtVersionSlice(storeKey string, key []byte, version *int64) (*grocksdb.Slice, error) { - return s.db.GetCF( + value, ts, err := s.db.GetCFWithTS( newTSReadOptions(version), s.cfHandle, prependStoreKey(storeKey, key), ) + if err != nil { + return nil, err + } + defer ts.Free() + + if value.Exists() && s.skipVersionZero { + if binary.LittleEndian.Uint64(ts.Data()) == 0 { + return nil, nil + } + } + + return value, err } // GetAtVersion implements VersionStore interface @@ -128,28 +148,24 @@ func (s Store) GetLatestVersion() (int64, error) { // IteratorAtVersion implements VersionStore interface func (s Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { - if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { - return nil, errKeyEmpty - } - - prefix := storePrefix(storeKey) - start, end = iterateWithPrefix(prefix, start, end) - - itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle) - return newRocksDBIterator(itr, prefix, start, end, false), nil + return s.iteratorAtVersion(storeKey, start, end, version, false) } // ReverseIteratorAtVersion implements VersionStore interface func (s Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { + return s.iteratorAtVersion(storeKey, start, end, version, true) +} + +func (s Store) iteratorAtVersion(storeKey string, start, end []byte, version *int64, reverse bool) (types.Iterator, error) { if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errKeyEmpty } prefix := storePrefix(storeKey) - start, end = iterateWithPrefix(storePrefix(storeKey), start, end) + start, end = iterateWithPrefix(prefix, start, end) itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle) - return newRocksDBIterator(itr, prefix, start, end, true), nil + return newRocksDBIterator(itr, prefix, start, end, reverse, s.skipVersionZero), nil } // FeedChangeSet is used to migrate legacy change sets into versiondb @@ -216,6 +232,61 @@ func (s Store) Flush() error { ) } +// FixData fixes wrong data written in versiondb due to rocksdb upgrade, the operation is idempotent. +// see: https://github.com/crypto-org-chain/cronos/issues/1683 +// call this before `SetSkipVersionZero(true)`. +func (s Store) FixData(storeKeys []types.StoreKey) error { + for _, storeKey := range storeKeys { + if err := s.fixDataStore(storeKey.Name()); err != nil { + return err + } + } + + return nil +} + +// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again. +func (s Store) fixDataStore(name string) error { + var version int64 + iter, err := s.IteratorAtVersion(name, nil, nil, &version) + if err != nil { + return err + } + defer iter.Close() + + batch := grocksdb.NewWriteBatch() + defer batch.Destroy() + + prefix := storePrefix(name) + for ; iter.Valid(); iter.Next() { + key := iter.Key() + if len(key) < TimestampSize { + return fmt.Errorf("invalid key length: %X", key) + } + + ts := key[len(key)-TimestampSize:] + key = key[:len(key)-TimestampSize] + + readOpts := grocksdb.NewDefaultReadOptions() + readOpts.SetTimestamp(ts) + oldValue, err := s.db.GetCF(readOpts, s.cfHandle, prependStoreKey(name, key)) + if err != nil { + return err + } + + clean := bytes.Equal(oldValue.Data(), iter.Value()) + oldValue.Free() + + if clean { + continue + } + + batch.PutCFWithTS(s.cfHandle, cloneAppend(prefix, key), ts, iter.Value()) + } + + return s.db.Write(defaultSyncWriteOpts, batch) +} + func newTSReadOptions(version *int64) *grocksdb.ReadOptions { var ver uint64 if version == nil { diff --git a/versiondb/tsrocksdb/store_test.go b/versiondb/tsrocksdb/store_test.go index a5328977a6..a8897f1b54 100644 --- a/versiondb/tsrocksdb/store_test.go +++ b/versiondb/tsrocksdb/store_test.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "testing" + "cosmossdk.io/store/types" + dbm "github.com/cosmos/cosmos-db" "github.com/crypto-org-chain/cronos/versiondb" "github.com/linxGnu/grocksdb" "github.com/stretchr/testify/require" @@ -153,3 +155,88 @@ func TestUserTimestampPruning(t *testing.T) { require.Equal(t, []byte{100}, bz.Data()) bz.Free() } + +func TestSkipVersionZero(t *testing.T) { + storeKey := types.NewKVStoreKey("test") + + var wrongTz [8]byte + binary.LittleEndian.PutUint64(wrongTz[:], 100) + + key1 := []byte("hello1") + key2 := []byte("hello2") + key2Wrong := cloneAppend(key2, wrongTz[:]) + key3 := []byte("hello3") + + store, err := NewStore(t.TempDir()) + require.NoError(t, err) + + err = store.PutAtVersion(0, []*types.StoreKVPair{ + {StoreKey: storeKey.Name(), Key: key2Wrong, Value: []byte{2}}, + }) + require.NoError(t, err) + err = store.PutAtVersion(100, []*types.StoreKVPair{ + {StoreKey: storeKey.Name(), Key: key1, Value: []byte{1}}, + }) + require.NoError(t, err) + err = store.PutAtVersion(100, []*types.StoreKVPair{ + {StoreKey: storeKey.Name(), Key: key3, Value: []byte{3}}, + }) + require.NoError(t, err) + + i := int64(999) + bz, err := store.GetAtVersion(storeKey.Name(), key2Wrong, &i) + require.NoError(t, err) + require.Equal(t, []byte{2}, bz) + + it, err := store.IteratorAtVersion(storeKey.Name(), nil, nil, &i) + require.NoError(t, err) + require.Equal(t, + []kvPair{ + {Key: key1, Value: []byte{1}}, + {Key: key2Wrong, Value: []byte{2}}, + {Key: key3, Value: []byte{3}}, + }, + consumeIterator(it), + ) + + store.SetSkipVersionZero(true) + + bz, err = store.GetAtVersion(storeKey.Name(), key2Wrong, &i) + require.NoError(t, err) + require.Empty(t, bz) + bz, err = store.GetAtVersion(storeKey.Name(), key1, &i) + require.NoError(t, err) + require.Equal(t, []byte{1}, bz) + + it, err = store.IteratorAtVersion(storeKey.Name(), nil, nil, &i) + require.NoError(t, err) + require.Equal(t, + []kvPair{ + {Key: key1, Value: []byte{1}}, + {Key: key3, Value: []byte{3}}, + }, + consumeIterator(it), + ) + + store.SetSkipVersionZero(false) + err = store.FixData([]types.StoreKey{storeKey}) + require.NoError(t, err) + + bz, err = store.GetAtVersion(storeKey.Name(), key2, &i) + require.NoError(t, err) + require.Equal(t, []byte{2}, bz) +} + +type kvPair struct { + Key []byte + Value []byte +} + +func consumeIterator(it dbm.Iterator) []kvPair { + var result []kvPair + for ; it.Valid(); it.Next() { + result = append(result, kvPair{it.Key(), it.Value()}) + } + it.Close() + return result +}