Skip to content

Commit

Permalink
separete read from iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Nov 12, 2024
1 parent 4c66b89 commit 2eceb45
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,46 +247,37 @@ func (s Store) FixData(storeNames []string, dryRun bool) error {

// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again.
func (s Store) fixDataStore(storeName string, dryRun bool) error {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
pairs, err := s.loadWrongData(storeName)
if err != nil {
return err
}
defer iter.Close()

batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

prefix := storePrefix(storeName)
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if len(key) < TimestampSize {
return fmt.Errorf("invalid key length: %X, store: %s", key, storeName)
}

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
realKey := cloneAppend(prefix, key)
readOpts := grocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
for _, pair := range pairs {
realKey := cloneAppend(prefix, pair.Key)

readOpts := grocksdb.NewDefaultReadOptions()
readOpts.SetTimestamp(ts)
readOpts.SetTimestamp(pair.Timestamp)
oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey)
if err != nil {
return err
}
readOpts.Destroy()

clean := bytes.Equal(oldValue.Data(), iter.Value())
clean := bytes.Equal(oldValue.Data(), pair.Value)
oldValue.Free()

if clean {
continue
}

if dryRun {
fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, key, ts)
fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, pair.Key, pair.Timestamp)
} else {
batch.PutCFWithTS(s.cfHandle, realKey, ts, iter.Value())
batch.PutCFWithTS(s.cfHandle, realKey, pair.Timestamp, pair.Value)
}
}

Expand All @@ -297,6 +288,39 @@ func (s Store) fixDataStore(storeName string, dryRun bool) error {
return nil
}

type KVPairWithTS struct {
Key []byte
Value []byte
Timestamp []byte
}

func (s Store) loadWrongData(storeName string) ([]KVPairWithTS, error) {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
if err != nil {
return nil, err
}
defer iter.Close()

var pairs []KVPairWithTS
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if len(key) < TimestampSize {
return nil, fmt.Errorf("invalid key length: %X, store: %s", key, storeName)
}

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
pairs = append(pairs, KVPairWithTS{
Key: key,
Value: iter.Value(),
Timestamp: ts,
})
}

return pairs, nil
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down

0 comments on commit 2eceb45

Please sign in to comment.