From 2859a89cda51ce6715497fb13e8e5eeacc9b16b3 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Tue, 25 Jul 2023 11:01:11 +0900 Subject: [PATCH] feat(storage): handle inconsistency between data and commit in ReadRecoveryPoints Previously the method internal/(*storage).ReadRecoveryPoints returned an error when it found an inconsistency between data and commit for log entries, which could happen when there was no data for a commit. It should not have happened when we used the unified database in storage, so the method returned an error. However, using separate databases in storage and turning off the WAL sync option can happen; for instance, the data for a log entry can be lost, although the commit for that log entry is synced. This PR makes the ReadRecoveryPoints not return an error for the inconsistency between data and commit for a log entry. It tries to find the first and last log entries that have no inconsistency. If there are no valid first and last, it returns nil for them and lets them be resolved through synchronization. --- internal/storage/recovery_points.go | 128 +++++-- internal/storage/storage_test.go | 552 ++++++++++++++++++++-------- 2 files changed, 507 insertions(+), 173 deletions(-) diff --git a/internal/storage/recovery_points.go b/internal/storage/recovery_points.go index b029843f3..6d1f65a2b 100644 --- a/internal/storage/recovery_points.go +++ b/internal/storage/recovery_points.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cockroachdb/pebble" + "go.uber.org/zap" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" @@ -24,19 +25,17 @@ type RecoveryPoints struct { // ReadRecoveryPoints reads data necessary to restore the status of a log // stream replica - the first and last log entries and commit context. -// Incompatible between the boundary of log entries and commit context is okay; -// thus, it returns nil as err. -// However, if there is a fatal error, such as missing data in a log entry, it -// returns an error. +// It is okay when the commit context is not matched with the last log entry, +// resolved through synchronization between replicas later. The first and last +// log entries can be nil if there is no log entry or they can't be read due to +// inconsistency between data and commit. However, if there is a fatal error, +// it returns an error. func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) { rp.LastCommitContext, err = s.readLastCommitContext() if err != nil { return } - rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last, err = s.readLogEntryBoundaries() - if err != nil { - return - } + rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last = s.readLogEntryBoundaries() uncommittedBegin := types.MinLLSN if cc := rp.LastCommitContext; cc != nil { @@ -63,41 +62,112 @@ func (s *Storage) readLastCommitContext() (*CommitContext, error) { return &cc, nil } -func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber, err error) { - it := s.commitDB.NewIter(&pebble.IterOptions{ +func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber) { + dit := s.dataDB.NewIter(&pebble.IterOptions{ + LowerBound: []byte{dataKeyPrefix}, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + cit := s.commitDB.NewIter(&pebble.IterOptions{ LowerBound: []byte{commitKeyPrefix}, UpperBound: []byte{commitKeySentinelPrefix}, }) defer func() { - _ = it.Close() + _ = dit.Close() + _ = cit.Close() }() - if !it.First() { - return nil, nil, nil + first = s.getFirstLogSequenceNumber(cit, dit) + if first == nil { + return nil, nil } - firstGLSN := decodeCommitKey(it.Key()) - firstLE, err := s.readGLSN(firstGLSN) - if err != nil { - return nil, nil, err + last = s.getLastLogSequenceNumber(cit, dit, first) + if last == nil { + s.logger.Warn("the last must exist but could not be found.", zap.Stringer("first", first)) + return nil, nil } - first = &varlogpb.LogSequenceNumber{ - LLSN: firstLE.LLSN, - GLSN: firstLE.GLSN, + return first, last +} + +func (s *Storage) getFirstLogSequenceNumber(cit, dit *pebble.Iterator) *varlogpb.LogSequenceNumber { + if !cit.First() || !dit.First() { + // No committed log entry is found. + return nil } - _ = it.Last() - lastGLSN := decodeCommitKey(it.Key()) - lastLE, err := s.readGLSN(lastGLSN) - if err != nil { - return first, nil, err + cLLSN := decodeDataKey(cit.Value()) + dLLSN := decodeDataKey(dit.Key()) + for cLLSN != dLLSN { + if dLLSN < cLLSN { + key := make([]byte, dataKeyLength) + key = encodeDataKeyInternal(cLLSN, key) + if !dit.SeekGE(key) { + // No committed log entry is found. + return nil + } + dLLSN = decodeDataKey(dit.Key()) + } else { // dLLSN > cLLSN + glsn := decodeCommitKey(cit.Key()) + glsn += types.GLSN(dLLSN - cLLSN) + key := make([]byte, commitKeyLength) + key = encodeCommitKeyInternal(glsn, key) + if !cit.SeekGE(key) { + // No committed log entry is found. + return nil + } + cLLSN = decodeDataKey(cit.Value()) + } + } + + firstGLSN := decodeCommitKey(cit.Key()) + return &varlogpb.LogSequenceNumber{ + LLSN: cLLSN, + GLSN: firstGLSN, + } +} + +func (s *Storage) getLastLogSequenceNumber(cit, dit *pebble.Iterator, first *varlogpb.LogSequenceNumber) *varlogpb.LogSequenceNumber { + // The last entry must exist since the first exists. + _ = cit.Last() + _ = dit.Last() + + cLLSN := decodeDataKey(cit.Value()) + dLLSN := decodeDataKey(dit.Key()) + + // If at least one LLSN of data or commit equals the LLSN of the first log + // entry, it should be the last since there is only one log entry. + if cLLSN == first.LLSN || dLLSN == first.LLSN { + return &varlogpb.LogSequenceNumber{ + LLSN: first.LLSN, + GLSN: first.GLSN, + } } - last = &varlogpb.LogSequenceNumber{ - LLSN: lastLE.LLSN, - GLSN: lastLE.GLSN, + + for cLLSN != dLLSN { + if dLLSN < cLLSN { + glsn := decodeCommitKey(cit.Key()) + glsn = glsn - types.GLSN(cLLSN-dLLSN) + 1 + key := make([]byte, commitKeyLength) + key = encodeCommitKeyInternal(glsn, key) + if !cit.SeekLT(key) { + return nil + } + cLLSN = decodeDataKey(cit.Value()) + } else { // dLLSN > cLLSN + key := make([]byte, dataKeyLength) + key = encodeDataKeyInternal(cLLSN+1, key) + if !dit.SeekLT(key) { + return nil + } + dLLSN = decodeDataKey(dit.Key()) + } } - return first, last, nil + lastGLSN := decodeCommitKey(cit.Key()) + return &varlogpb.LogSequenceNumber{ + LLSN: cLLSN, + GLSN: lastGLSN, + } } func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) { diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go index c550eb434..02de67fb2 100644 --- a/internal/storage/storage_test.go +++ b/internal/storage/storage_test.go @@ -615,170 +615,434 @@ func TestStorageReadLastCommitContext(t *testing.T) { } func TestStorageReadLogEntryBoundaries(t *testing.T) { - testStorage(t, func(t testing.TB, stg *Storage) { - // no logs - first, last, err := stg.readLogEntryBoundaries() - assert.NoError(t, err) - assert.Nil(t, first) - assert.Nil(t, last) - - // single log - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(1, 1)) - assert.NoError(t, cb.Apply()) - - // no data corresponded to the commit - _, _, err = stg.readLogEntryBoundaries() - assert.Error(t, err) + tcs := []struct { + name string + testf func(t *testing.T, stg *Storage) + }{ + { + name: "NoLogEntry", + testf: func(t *testing.T, stg *Storage) { + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + name: "NoLogEntryData", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) - wb := stg.NewWriteBatch() - assert.NoError(t, wb.Set(1, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + name: "SingleLogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) - first, last, err = stg.readLogEntryBoundaries() - assert.NoError(t, err) - // FIXME(jun): LogEntryMeta has unnecessary fields? - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + want := &varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1} + first, last := stg.readLogEntryBoundaries() + require.Equal(t, want, first) + require.Equal(t, want, last) + }, + }, + { + name: "TwoLogEntries", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 3, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // two logs - cb, err = stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 2, - CommittedGLSNEnd: 3, - CommittedLLSNBegin: 2, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(2, 2)) - assert.NoError(t, cb.Apply()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: _ 2 3 + // commit: 1 2 3 + name: "NoDataAtFront", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - // no data corresponded to the commit - _, _, err = stg.readLogEntryBoundaries() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 3 + // commit: _ 2 3 + name: "NoCommitAtFront", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - wb = stg.NewWriteBatch() - assert.NoError(t, wb.Set(2, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 _ + // commit: 1 2 3 + name: "NoDataAtRear", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - first, last, err = stg.readLogEntryBoundaries() - assert.NoError(t, err) - // FIXME(jun): LogEntryMeta has unnecessary fields? - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) - }) -} + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: 1 2 3 + // commit: 1 2 _ + name: "NoCommitAtRear", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) -func TestStorageReadRecoveryPoints(t *testing.T) { - testStorage(t, func(t testing.TB, stg *Storage) { - rp, err := stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Nil(t, rp.LastCommitContext) - assert.Nil(t, rp.CommittedLogEntry.First) - assert.Nil(t, rp.CommittedLogEntry.Last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: _ 2 3 + // commit: 1 2 _ + name: "SingleLogEntryWithAnomaly", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // empty cc - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 1, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + want := &varlogpb.LogSequenceNumber{LLSN: 2, GLSN: 2} + first, last := stg.readLogEntryBoundaries() + require.Equal(t, want, first) + require.Equal(t, want, last) + }, + }, + { + // data: 1 2 3 _ + // commit: _ 2 3 4 + name: "TwoLogEntriesWithAnomaly", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 5, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Set(4, 4)) + require.NoError(t, cb.Apply()) - rp, err = stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Equal(t, &CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 1, - CommittedLLSNBegin: 1, - }, rp.LastCommitContext) - assert.Nil(t, rp.CommittedLogEntry.First) - assert.Nil(t, rp.CommittedLogEntry.Last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 _ + // commit: _ _ 3 + name: "NoCommittedLogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - // nonempty cc - cb, err = stg.NewCommitBatch(CommitContext{ - Version: 2, - HighWatermark: 2, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(1, 1)) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + // data: _ _ 3 + // commit: 1 2 _ + name: "NoLCommittedogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // no data corresponded to the commit - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) - wb := stg.NewWriteBatch() - assert.NoError(t, wb.Set(1, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + } - rp, err = stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Equal(t, &CommitContext{ - Version: 2, - HighWatermark: 2, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }, rp.LastCommitContext) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.First) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.Last) - }) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stg := TestNewStorage(t) + defer func() { + err := stg.Close() + require.NoError(t, err) + }() + tc.testf(t, stg) + }) + } } -func TestStorageReadRecoveryPoints_InconsistentWriteCommit(t *testing.T) { - t.Skip("Storage will not consider the consistency of committed logs.") - testStorage(t, func(t testing.TB, stg *Storage) { - ck := make([]byte, commitKeyLength) - dk := make([]byte, dataKeyLength) +func TestStorageReadRecoveryPoints(t *testing.T) { + tcs := []struct { + name string + testf func(t *testing.T, stg *Storage) + }{ + { + name: "NoLogEntry", + testf: func(t *testing.T, stg *Storage) { + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Nil(t, rp.LastCommitContext) + require.Nil(t, rp.CommittedLogEntry.First) + require.Nil(t, rp.CommittedLogEntry.Last) + }, + }, + { + name: "EmptyCommitContext", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 1, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - // committed log: llsn = 1, glsn = 1 - err := stg.commitDB.Set(encodeCommitKeyInternal(1, ck), encodeDataKeyInternal(1, dk), pebble.Sync) - assert.NoError(t, err) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Zero(t, rp.CommittedLogEntry) + }, + }, + { + name: "NoData", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - err = stg.dataDB.Set(encodeDataKeyInternal(1, dk), nil, pebble.Sync) - assert.NoError(t, err) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Zero(t, rp.CommittedLogEntry) + }, + }, + { + name: "SingleLogEntry", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - // A committed log exists, but no commit context for that. - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) - // nonempty cc: llsn = 1, glsn = 2 - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 2, - CommittedGLSNEnd: 3, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.First) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.Last) + }, + }, + } - // Committed log and commit context exist, but they are inconsistent. - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) - }) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stg := TestNewStorage(t) + defer func() { + err := stg.Close() + require.NoError(t, err) + }() + tc.testf(t, stg) + }) + } } func TestStorage_TrimWhenNoLogEntry(t *testing.T) {