From 2eceb452fc3f718e30c87c0c8e1bafc4eda135d5 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 12 Nov 2024 10:19:21 +0800 Subject: [PATCH] separete read from iteration --- versiondb/tsrocksdb/store.go | 60 +++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/versiondb/tsrocksdb/store.go b/versiondb/tsrocksdb/store.go index bbce912f3c..86f1de9c51 100644 --- a/versiondb/tsrocksdb/store.go +++ b/versiondb/tsrocksdb/store.go @@ -247,36 +247,27 @@ func (s Store) FixData(storeNames []string, dryRun bool) error { // fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again. func (s Store) fixDataStore(storeName string, dryRun bool) error { - var version int64 - iter, err := s.IteratorAtVersion(storeName, nil, nil, &version) + pairs, err := s.loadWrongData(storeName) if err != nil { return err } - defer iter.Close() batch := grocksdb.NewWriteBatch() defer batch.Destroy() prefix := storePrefix(storeName) - for ; iter.Valid(); iter.Next() { - key := iter.Key() - if len(key) < TimestampSize { - return fmt.Errorf("invalid key length: %X, store: %s", key, storeName) - } - - ts := key[len(key)-TimestampSize:] - key = key[:len(key)-TimestampSize] - realKey := cloneAppend(prefix, key) + readOpts := grocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + for _, pair := range pairs { + realKey := cloneAppend(prefix, pair.Key) - readOpts := grocksdb.NewDefaultReadOptions() - readOpts.SetTimestamp(ts) + readOpts.SetTimestamp(pair.Timestamp) oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey) if err != nil { return err } - readOpts.Destroy() - clean := bytes.Equal(oldValue.Data(), iter.Value()) + clean := bytes.Equal(oldValue.Data(), pair.Value) oldValue.Free() if clean { @@ -284,9 +275,9 @@ func (s Store) fixDataStore(storeName string, dryRun bool) error { } if dryRun { - fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, key, ts) + fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, pair.Key, pair.Timestamp) } else { - batch.PutCFWithTS(s.cfHandle, realKey, ts, iter.Value()) + batch.PutCFWithTS(s.cfHandle, realKey, pair.Timestamp, pair.Value) } } @@ -297,6 +288,39 @@ func (s Store) fixDataStore(storeName string, dryRun bool) error { return nil } +type KVPairWithTS struct { + Key []byte + Value []byte + Timestamp []byte +} + +func (s Store) loadWrongData(storeName string) ([]KVPairWithTS, error) { + var version int64 + iter, err := s.IteratorAtVersion(storeName, nil, nil, &version) + if err != nil { + return nil, err + } + defer iter.Close() + + var pairs []KVPairWithTS + for ; iter.Valid(); iter.Next() { + key := iter.Key() + if len(key) < TimestampSize { + return nil, fmt.Errorf("invalid key length: %X, store: %s", key, storeName) + } + + ts := key[len(key)-TimestampSize:] + key = key[:len(key)-TimestampSize] + pairs = append(pairs, KVPairWithTS{ + Key: key, + Value: iter.Value(), + Timestamp: ts, + }) + } + + return pairs, nil +} + func newTSReadOptions(version *int64) *grocksdb.ReadOptions { var ver uint64 if version == nil {