Skip to content

Commit

Permalink
feat(storage): handle inconsistency between data and commit in ReadRe…
Browse files Browse the repository at this point in the history
…coveryPoints

Previously the method internal/(*storage).ReadRecoveryPoints returned an error when it found an
inconsistency between data and commit for log entries, which could happen when there was no data for
a commit. It should not have happened when we used the unified database in storage, so the method
returned an error.
However, using separate databases in storage and turning off the WAL sync option can happen; for
instance, the data for a log entry can be lost, although the commit for that log entry is synced.

This PR makes the ReadRecoveryPoints not return an error for the inconsistency between data and
commit for a log entry. It tries to find the first and last log entries that have no inconsistency.
If there are no valid first and last, it returns nil for them and lets them be resolved through
synchronization.
  • Loading branch information
ijsong committed Oct 4, 2023
1 parent 12024e0 commit e33a700
Show file tree
Hide file tree
Showing 2 changed files with 507 additions and 173 deletions.
128 changes: 99 additions & 29 deletions internal/storage/recovery_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/cockroachdb/pebble"
"go.uber.org/zap"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
Expand All @@ -24,19 +25,17 @@ type RecoveryPoints struct {

// ReadRecoveryPoints reads data necessary to restore the status of a log
// stream replica - the first and last log entries and commit context.
// Incompatible between the boundary of log entries and commit context is okay;
// thus, it returns nil as err.
// However, if there is a fatal error, such as missing data in a log entry, it
// returns an error.
// It is okay when the commit context is not matched with the last log entry,
// resolved through synchronization between replicas later. The first and last
// log entries can be nil if there is no log entry or they can't be read due to
// inconsistency between data and commit. However, if there is a fatal error,
// it returns an error.
func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) {
rp.LastCommitContext, err = s.readLastCommitContext()
if err != nil {
return
}
rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last, err = s.readLogEntryBoundaries()
if err != nil {
return
}
rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last = s.readLogEntryBoundaries()

uncommittedBegin := types.MinLLSN
if cc := rp.LastCommitContext; cc != nil {
Expand All @@ -63,41 +62,112 @@ func (s *Storage) readLastCommitContext() (*CommitContext, error) {
return &cc, nil
}

func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber, err error) {
it := s.commitDB.NewIter(&pebble.IterOptions{
func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber) {
dit := s.dataDB.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
cit := s.commitDB.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
defer func() {
_ = it.Close()
_ = dit.Close()
_ = cit.Close()
}()

if !it.First() {
return nil, nil, nil
first = s.getFirstLogSequenceNumber(cit, dit)
if first == nil {
return nil, nil
}

firstGLSN := decodeCommitKey(it.Key())
firstLE, err := s.readGLSN(firstGLSN)
if err != nil {
return nil, nil, err
last = s.getLastLogSequenceNumber(cit, dit, first)
if last == nil {
s.logger.Warn("the last must exist but could not be found.", zap.Stringer("first", first))
return nil, nil
}
first = &varlogpb.LogSequenceNumber{
LLSN: firstLE.LLSN,
GLSN: firstLE.GLSN,
return first, last
}

func (s *Storage) getFirstLogSequenceNumber(cit, dit *pebble.Iterator) *varlogpb.LogSequenceNumber {
if !cit.First() || !dit.First() {
// No committed log entry is found.
return nil
}

_ = it.Last()
lastGLSN := decodeCommitKey(it.Key())
lastLE, err := s.readGLSN(lastGLSN)
if err != nil {
return first, nil, err
cLLSN := decodeDataKey(cit.Value())
dLLSN := decodeDataKey(dit.Key())
for cLLSN != dLLSN {
if dLLSN < cLLSN {
key := make([]byte, dataKeyLength)
key = encodeDataKeyInternal(cLLSN, key)
if !dit.SeekGE(key) {
// No committed log entry is found.
return nil
}
dLLSN = decodeDataKey(dit.Key())
} else { // dLLSN > cLLSN
glsn := decodeCommitKey(cit.Key())
glsn += types.GLSN(dLLSN - cLLSN)
key := make([]byte, commitKeyLength)
key = encodeCommitKeyInternal(glsn, key)
if !cit.SeekGE(key) {
// No committed log entry is found.
return nil
}
cLLSN = decodeDataKey(cit.Value())
}
}

firstGLSN := decodeCommitKey(cit.Key())
return &varlogpb.LogSequenceNumber{
LLSN: cLLSN,
GLSN: firstGLSN,
}
}

func (s *Storage) getLastLogSequenceNumber(cit, dit *pebble.Iterator, first *varlogpb.LogSequenceNumber) *varlogpb.LogSequenceNumber {
// The last entry must exist since the first exists.
_ = cit.Last()
_ = dit.Last()

cLLSN := decodeDataKey(cit.Value())
dLLSN := decodeDataKey(dit.Key())

// If at least one LLSN of data or commit equals the LLSN of the first log
// entry, it should be the last since there is only one log entry.
if cLLSN == first.LLSN || dLLSN == first.LLSN {
return &varlogpb.LogSequenceNumber{
LLSN: first.LLSN,
GLSN: first.GLSN,
}
}
last = &varlogpb.LogSequenceNumber{
LLSN: lastLE.LLSN,
GLSN: lastLE.GLSN,

for cLLSN != dLLSN {
if dLLSN < cLLSN {
glsn := decodeCommitKey(cit.Key())
glsn = glsn - types.GLSN(cLLSN-dLLSN) + 1
key := make([]byte, commitKeyLength)
key = encodeCommitKeyInternal(glsn, key)
if !cit.SeekLT(key) {
return nil
}
cLLSN = decodeDataKey(cit.Value())
} else { // dLLSN > cLLSN
key := make([]byte, dataKeyLength)
key = encodeDataKeyInternal(cLLSN+1, key)
if !dit.SeekLT(key) {
return nil
}
dLLSN = decodeDataKey(dit.Key())
}
}

return first, last, nil
lastGLSN := decodeCommitKey(cit.Key())
return &varlogpb.LogSequenceNumber{
LLSN: cLLSN,
GLSN: lastGLSN,
}
}

func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) {
Expand Down
Loading

0 comments on commit e33a700

Please sign in to comment.