Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): handle inconsistency between data and commit in ReadRecoveryPoints #545

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 99 additions & 29 deletions internal/storage/recovery_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/cockroachdb/pebble"
"go.uber.org/zap"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
Expand All @@ -24,19 +25,17 @@ type RecoveryPoints struct {

// ReadRecoveryPoints reads data necessary to restore the status of a log
// stream replica - the first and last log entries and commit context.
// Incompatible between the boundary of log entries and commit context is okay;
// thus, it returns nil as err.
// However, if there is a fatal error, such as missing data in a log entry, it
// returns an error.
// It is okay when the commit context is not matched with the last log entry,
// resolved through synchronization between replicas later. The first and last
// log entries can be nil if there is no log entry or they can't be read due to
// inconsistency between data and commit. However, if there is a fatal error,
// it returns an error.
func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) {
rp.LastCommitContext, err = s.readLastCommitContext()
if err != nil {
return
}
rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last, err = s.readLogEntryBoundaries()
if err != nil {
return
}
rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last = s.readLogEntryBoundaries()

uncommittedBegin := types.MinLLSN
if cc := rp.LastCommitContext; cc != nil {
Expand All @@ -63,41 +62,112 @@ func (s *Storage) readLastCommitContext() (*CommitContext, error) {
return &cc, nil
}

func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber, err error) {
it := s.commitDB.NewIter(&pebble.IterOptions{
func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogSequenceNumber) {
dit := s.dataDB.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
cit := s.commitDB.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
defer func() {
_ = it.Close()
_ = dit.Close()
_ = cit.Close()
}()

if !it.First() {
return nil, nil, nil
first = s.getFirstLogSequenceNumber(cit, dit)
if first == nil {
return nil, nil
}

firstGLSN := decodeCommitKey(it.Key())
firstLE, err := s.readGLSN(firstGLSN)
if err != nil {
return nil, nil, err
last = s.getLastLogSequenceNumber(cit, dit, first)
if last == nil {
s.logger.Warn("the last must exist but could not be found.", zap.Stringer("first", first))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a case that could happen? If it happend, rp.CommittedLogEntry.Last[L33] is nil and it means maybe trimmed[internal/storagenode/logstream/executor.go:L650]. Are there any side effects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot happen because the first log entry already exists. The first log entry is also the last if only a single log entry exists. Therefore, this if block is unnecessary but just added for debugging in weird situations.

These similar anomalies, including this if block, can be fixed through synchronization. But as you said, we need to know whether this replica has been trimmed. I think the global low watermark can be helpful.

return nil, nil
}
first = &varlogpb.LogSequenceNumber{
LLSN: firstLE.LLSN,
GLSN: firstLE.GLSN,
return first, last
}

func (s *Storage) getFirstLogSequenceNumber(cit, dit *pebble.Iterator) *varlogpb.LogSequenceNumber {
if !cit.First() || !dit.First() {
// No committed log entry is found.
return nil
}

_ = it.Last()
lastGLSN := decodeCommitKey(it.Key())
lastLE, err := s.readGLSN(lastGLSN)
if err != nil {
return first, nil, err
cLLSN := decodeDataKey(cit.Value())
dLLSN := decodeDataKey(dit.Key())
for cLLSN != dLLSN {
if dLLSN < cLLSN {
key := make([]byte, dataKeyLength)
key = encodeDataKeyInternal(cLLSN, key)
if !dit.SeekGE(key) {
// No committed log entry is found.
return nil
}
dLLSN = decodeDataKey(dit.Key())
} else { // dLLSN > cLLSN
glsn := decodeCommitKey(cit.Key())
glsn += types.GLSN(dLLSN - cLLSN)
key := make([]byte, commitKeyLength)
key = encodeCommitKeyInternal(glsn, key)
if !cit.SeekGE(key) {
// No committed log entry is found.
return nil
}
cLLSN = decodeDataKey(cit.Value())
}
}

firstGLSN := decodeCommitKey(cit.Key())
return &varlogpb.LogSequenceNumber{
LLSN: cLLSN,
GLSN: firstGLSN,
}
}

func (s *Storage) getLastLogSequenceNumber(cit, dit *pebble.Iterator, first *varlogpb.LogSequenceNumber) *varlogpb.LogSequenceNumber {
// The last entry must exist since the first exists.
_ = cit.Last()
_ = dit.Last()

cLLSN := decodeDataKey(cit.Value())
dLLSN := decodeDataKey(dit.Key())

// If at least one LLSN of data or commit equals the LLSN of the first log
// entry, it should be the last since there is only one log entry.
if cLLSN == first.LLSN || dLLSN == first.LLSN {
return &varlogpb.LogSequenceNumber{
LLSN: first.LLSN,
GLSN: first.GLSN,
}
}
last = &varlogpb.LogSequenceNumber{
LLSN: lastLE.LLSN,
GLSN: lastLE.GLSN,

for cLLSN != dLLSN {
if dLLSN < cLLSN {
glsn := decodeCommitKey(cit.Key())
glsn = glsn - types.GLSN(cLLSN-dLLSN) + 1
key := make([]byte, commitKeyLength)
key = encodeCommitKeyInternal(glsn, key)
if !cit.SeekLT(key) {
return nil
}
cLLSN = decodeDataKey(cit.Value())
} else { // dLLSN > cLLSN
key := make([]byte, dataKeyLength)
key = encodeDataKeyInternal(cLLSN+1, key)
if !dit.SeekLT(key) {
return nil
}
dLLSN = decodeDataKey(dit.Key())
}
}

return first, last, nil
lastGLSN := decodeCommitKey(cit.Key())
return &varlogpb.LogSequenceNumber{
LLSN: cLLSN,
GLSN: lastGLSN,
}
}

func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) {
Expand Down
Loading