diff --git a/internal/storage/recovery_points.go b/internal/storage/recovery_points.go index b029843f3..6d1f65a2b 100644 --- a/internal/storage/recovery_points.go +++ b/internal/storage/recovery_points.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cockroachdb/pebble" + "go.uber.org/zap" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" @@ -24,19 +25,17 @@ type RecoveryPoints struct { // ReadRecoveryPoints reads data necessary to restore the status of a log // stream replica - the first and last log entries and commit context. -// Incompatible between the boundary of log entries and commit context is okay; -// thus, it returns nil as err. -// However, if there is a fatal error, such as missing data in a log entry, it -// returns an error. +// It is okay when the commit context is not matched with the last log entry, +// resolved through synchronization between replicas later. The first and last +// log entries can be nil if there is no log entry or they can't be read due to +// inconsistency between data and commit. However, if there is a fatal error, +// it returns an error. func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) { rp.LastCommitContext, err = s.readLastCommitContext() if err != nil { return } - rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last, err = s.readLogEntryBoundaries() - if err != nil { - return - } + rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last = s.readLogEntryBoundaries() uncommittedBegin := types.MinLLSN if cc := rp.LastCommitContext; cc != nil { @@ -63,41 +62,112 @@ func (s *Storage) readLastCommitContext() (*CommitContext, error) { return &cc, nil } -func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber, err error) { - it := s.commitDB.NewIter(&pebble.IterOptions{ +func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber) { + dit := s.dataDB.NewIter(&pebble.IterOptions{ + LowerBound: []byte{dataKeyPrefix}, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + cit := s.commitDB.NewIter(&pebble.IterOptions{ LowerBound: []byte{commitKeyPrefix}, UpperBound: []byte{commitKeySentinelPrefix}, }) defer func() { - _ = it.Close() + _ = dit.Close() + _ = cit.Close() }() - if !it.First() { - return nil, nil, nil + first = s.getFirstLogSequenceNumber(cit, dit) + if first == nil { + return nil, nil } - firstGLSN := decodeCommitKey(it.Key()) - firstLE, err := s.readGLSN(firstGLSN) - if err != nil { - return nil, nil, err + last = s.getLastLogSequenceNumber(cit, dit, first) + if last == nil { + s.logger.Warn("the last must exist but could not be found.", zap.Stringer("first", first)) + return nil, nil } - first = &varlogpb.LogSequenceNumber{ - LLSN: firstLE.LLSN, - GLSN: firstLE.GLSN, + return first, last +} + +func (s *Storage) getFirstLogSequenceNumber(cit, dit *pebble.Iterator) *varlogpb.LogSequenceNumber { + if !cit.First() || !dit.First() { + // No committed log entry is found. + return nil } - _ = it.Last() - lastGLSN := decodeCommitKey(it.Key()) - lastLE, err := s.readGLSN(lastGLSN) - if err != nil { - return first, nil, err + cLLSN := decodeDataKey(cit.Value()) + dLLSN := decodeDataKey(dit.Key()) + for cLLSN != dLLSN { + if dLLSN < cLLSN { + key := make([]byte, dataKeyLength) + key = encodeDataKeyInternal(cLLSN, key) + if !dit.SeekGE(key) { + // No committed log entry is found. + return nil + } + dLLSN = decodeDataKey(dit.Key()) + } else { // dLLSN > cLLSN + glsn := decodeCommitKey(cit.Key()) + glsn += types.GLSN(dLLSN - cLLSN) + key := make([]byte, commitKeyLength) + key = encodeCommitKeyInternal(glsn, key) + if !cit.SeekGE(key) { + // No committed log entry is found. + return nil + } + cLLSN = decodeDataKey(cit.Value()) + } + } + + firstGLSN := decodeCommitKey(cit.Key()) + return &varlogpb.LogSequenceNumber{ + LLSN: cLLSN, + GLSN: firstGLSN, + } +} + +func (s *Storage) getLastLogSequenceNumber(cit, dit *pebble.Iterator, first *varlogpb.LogSequenceNumber) *varlogpb.LogSequenceNumber { + // The last entry must exist since the first exists. + _ = cit.Last() + _ = dit.Last() + + cLLSN := decodeDataKey(cit.Value()) + dLLSN := decodeDataKey(dit.Key()) + + // If at least one LLSN of data or commit equals the LLSN of the first log + // entry, it should be the last since there is only one log entry. + if cLLSN == first.LLSN || dLLSN == first.LLSN { + return &varlogpb.LogSequenceNumber{ + LLSN: first.LLSN, + GLSN: first.GLSN, + } } - last = &varlogpb.LogSequenceNumber{ - LLSN: lastLE.LLSN, - GLSN: lastLE.GLSN, + + for cLLSN != dLLSN { + if dLLSN < cLLSN { + glsn := decodeCommitKey(cit.Key()) + glsn = glsn - types.GLSN(cLLSN-dLLSN) + 1 + key := make([]byte, commitKeyLength) + key = encodeCommitKeyInternal(glsn, key) + if !cit.SeekLT(key) { + return nil + } + cLLSN = decodeDataKey(cit.Value()) + } else { // dLLSN > cLLSN + key := make([]byte, dataKeyLength) + key = encodeDataKeyInternal(cLLSN+1, key) + if !dit.SeekLT(key) { + return nil + } + dLLSN = decodeDataKey(dit.Key()) + } } - return first, last, nil + lastGLSN := decodeCommitKey(cit.Key()) + return &varlogpb.LogSequenceNumber{ + LLSN: cLLSN, + GLSN: lastGLSN, + } } func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) { diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go index c550eb434..02de67fb2 100644 --- a/internal/storage/storage_test.go +++ b/internal/storage/storage_test.go @@ -615,170 +615,434 @@ func TestStorageReadLastCommitContext(t *testing.T) { } func TestStorageReadLogEntryBoundaries(t *testing.T) { - testStorage(t, func(t testing.TB, stg *Storage) { - // no logs - first, last, err := stg.readLogEntryBoundaries() - assert.NoError(t, err) - assert.Nil(t, first) - assert.Nil(t, last) - - // single log - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(1, 1)) - assert.NoError(t, cb.Apply()) - - // no data corresponded to the commit - _, _, err = stg.readLogEntryBoundaries() - assert.Error(t, err) + tcs := []struct { + name string + testf func(t *testing.T, stg *Storage) + }{ + { + name: "NoLogEntry", + testf: func(t *testing.T, stg *Storage) { + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + name: "NoLogEntryData", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) - wb := stg.NewWriteBatch() - assert.NoError(t, wb.Set(1, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + name: "SingleLogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) - first, last, err = stg.readLogEntryBoundaries() - assert.NoError(t, err) - // FIXME(jun): LogEntryMeta has unnecessary fields? - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + want := &varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1} + first, last := stg.readLogEntryBoundaries() + require.Equal(t, want, first) + require.Equal(t, want, last) + }, + }, + { + name: "TwoLogEntries", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 3, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // two logs - cb, err = stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 2, - CommittedGLSNEnd: 3, - CommittedLLSNBegin: 2, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(2, 2)) - assert.NoError(t, cb.Apply()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: _ 2 3 + // commit: 1 2 3 + name: "NoDataAtFront", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - // no data corresponded to the commit - _, _, err = stg.readLogEntryBoundaries() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 3 + // commit: _ 2 3 + name: "NoCommitAtFront", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - wb = stg.NewWriteBatch() - assert.NoError(t, wb.Set(2, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 _ + // commit: 1 2 3 + name: "NoDataAtRear", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - first, last, err = stg.readLogEntryBoundaries() - assert.NoError(t, err) - // FIXME(jun): LogEntryMeta has unnecessary fields? - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) - }) -} + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: 1 2 3 + // commit: 1 2 _ + name: "NoCommitAtRear", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) -func TestStorageReadRecoveryPoints(t *testing.T) { - testStorage(t, func(t testing.TB, stg *Storage) { - rp, err := stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Nil(t, rp.LastCommitContext) - assert.Nil(t, rp.CommittedLogEntry.First) - assert.Nil(t, rp.CommittedLogEntry.Last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, last) + }, + }, + { + // data: _ 2 3 + // commit: 1 2 _ + name: "SingleLogEntryWithAnomaly", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // empty cc - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 1, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + want := &varlogpb.LogSequenceNumber{LLSN: 2, GLSN: 2} + first, last := stg.readLogEntryBoundaries() + require.Equal(t, want, first) + require.Equal(t, want, last) + }, + }, + { + // data: 1 2 3 _ + // commit: _ 2 3 4 + name: "TwoLogEntriesWithAnomaly", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 5, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Set(4, 4)) + require.NoError(t, cb.Apply()) - rp, err = stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Equal(t, &CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 1, - CommittedLLSNBegin: 1, - }, rp.LastCommitContext) - assert.Nil(t, rp.CommittedLogEntry.First) - assert.Nil(t, rp.CommittedLogEntry.Last) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 2, LLSN: 2}, first) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 3, LLSN: 3}, last) + }, + }, + { + // data: 1 2 _ + // commit: _ _ 3 + name: "NoCommittedLogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(3, 3)) + require.NoError(t, cb.Apply()) - // nonempty cc - cb, err = stg.NewCommitBatch(CommitContext{ - Version: 2, - HighWatermark: 2, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(1, 1)) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Set(2, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) + + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + { + // data: _ _ 3 + // commit: 1 2 _ + name: "NoLCommittedogEntry", + testf: func(t *testing.T, stg *Storage) { + cb, err := stg.NewCommitBatch(CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Set(2, 2)) + require.NoError(t, cb.Apply()) - // no data corresponded to the commit - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(3, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) - wb := stg.NewWriteBatch() - assert.NoError(t, wb.Set(1, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) + first, last := stg.readLogEntryBoundaries() + require.Nil(t, first) + require.Nil(t, last) + }, + }, + } - rp, err = stg.ReadRecoveryPoints() - assert.NoError(t, err) - assert.Equal(t, &CommitContext{ - Version: 2, - HighWatermark: 2, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 2, - CommittedLLSNBegin: 1, - }, rp.LastCommitContext) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.First) - assert.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.Last) - }) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stg := TestNewStorage(t) + defer func() { + err := stg.Close() + require.NoError(t, err) + }() + tc.testf(t, stg) + }) + } } -func TestStorageReadRecoveryPoints_InconsistentWriteCommit(t *testing.T) { - t.Skip("Storage will not consider the consistency of committed logs.") - testStorage(t, func(t testing.TB, stg *Storage) { - ck := make([]byte, commitKeyLength) - dk := make([]byte, dataKeyLength) +func TestStorageReadRecoveryPoints(t *testing.T) { + tcs := []struct { + name string + testf func(t *testing.T, stg *Storage) + }{ + { + name: "NoLogEntry", + testf: func(t *testing.T, stg *Storage) { + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Nil(t, rp.LastCommitContext) + require.Nil(t, rp.CommittedLogEntry.First) + require.Nil(t, rp.CommittedLogEntry.Last) + }, + }, + { + name: "EmptyCommitContext", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 1, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - // committed log: llsn = 1, glsn = 1 - err := stg.commitDB.Set(encodeCommitKeyInternal(1, ck), encodeDataKeyInternal(1, dk), pebble.Sync) - assert.NoError(t, err) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Zero(t, rp.CommittedLogEntry) + }, + }, + { + name: "NoData", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - err = stg.dataDB.Set(encodeDataKeyInternal(1, dk), nil, pebble.Sync) - assert.NoError(t, err) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Zero(t, rp.CommittedLogEntry) + }, + }, + { + name: "SingleLogEntry", + testf: func(t *testing.T, stg *Storage) { + cc := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + } + + cb, err := stg.NewCommitBatch(cc) + require.NoError(t, err) + require.NoError(t, cb.Set(1, 1)) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) - // A committed log exists, but no commit context for that. - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) + wb := stg.NewWriteBatch() + require.NoError(t, wb.Set(1, nil)) + require.NoError(t, wb.Apply()) + require.NoError(t, wb.Close()) - // nonempty cc: llsn = 1, glsn = 2 - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 1, - CommittedGLSNBegin: 2, - CommittedGLSNEnd: 3, - CommittedLLSNBegin: 1, - }) - assert.NoError(t, err) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + rp, err := stg.ReadRecoveryPoints() + require.NoError(t, err) + require.Equal(t, &cc, rp.LastCommitContext) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.First) + require.Equal(t, &varlogpb.LogSequenceNumber{GLSN: 1, LLSN: 1}, rp.CommittedLogEntry.Last) + }, + }, + } - // Committed log and commit context exist, but they are inconsistent. - _, err = stg.ReadRecoveryPoints() - assert.Error(t, err) - }) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stg := TestNewStorage(t) + defer func() { + err := stg.Close() + require.NoError(t, err) + }() + tc.testf(t, stg) + }) + } } func TestStorage_TrimWhenNoLogEntry(t *testing.T) {