Skip to content

Commit

Permalink
fix(replication): fix wrong memory status block replication (#569)
Browse files Browse the repository at this point in the history
### Motivation

The follower didn't update the `lastAppendedOffset` status when the
leader truncated the follower, which caused some expected entries to be
filtered by deduplication logic and never recovered.

The server will keep printing:

```
{"level":"warn","time":"2024-11-13T11:38:33.0884113Z","component":"follower-cursor","error":{"error":"rpc error: code = Unknown desc = 20874694 can not immediately follow 20874678: oxia: invalid next offset in wal","kind":"*status.Error","stack":null},"follower":"xxxxxx","namespace":"xxxxxxxx","shard":13,"term":307,"time":"2024-11-13T11:38:33.088438126Z","message":"Error while receiving acks"}
```

### Modification

- Align `lastAppendedOffset` alone with internal WAL headOffset.
  • Loading branch information
mattisonchao authored Nov 13, 2024
1 parent ab60cd9 commit ee0a7a4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
1 change: 1 addition & 0 deletions server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (fc *followerController) Truncate(req *proto.TruncateRequest) (*proto.Trunc
return nil, errors.Wrapf(err, "failed to truncate wal. truncate-offset: %d - wal-last-offset: %d",
req.HeadEntryId.Offset, fc.wal.LastOffset())
}
fc.lastAppendedOffset = headOffset

return &proto.TruncateResponse{
HeadEntryId: &proto.EntryId{
Expand Down
53 changes: 51 additions & 2 deletions server/follower_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func TestFollower(t *testing.T) {
wg := common.NewWaitGroup(1)

go func() {
err := fc.Replicate(stream)
assert.ErrorIs(t, err, context.Canceled)
_ = fc.Replicate(stream)
wg.Done()
}()

Expand All @@ -111,6 +110,56 @@ func TestFollower(t *testing.T) {
assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
assert.EqualValues(t, 1, fc.Term())

// close follower
assert.NoError(t, fc.Close())

// new term to test if we can continue replicate messages
fc, err = NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory)
assert.NoError(t, err)
assert.Equal(t, proto.ServingStatus_NOT_MEMBER, fc.Status())
_, err = fc.NewTerm(&proto.NewTermRequest{Term: 2})
assert.NoError(t, err)
assert.Equal(t, proto.ServingStatus_FENCED, fc.Status())
assert.EqualValues(t, 2, fc.Term())
truncateResp, err = fc.Truncate(&proto.TruncateRequest{
Term: 2,
HeadEntryId: &proto.EntryId{
Term: 1,
Offset: 0,
},
})
assert.NoError(t, err)
assert.EqualValues(t, 2, truncateResp.HeadEntryId.Term)

assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
stream = newMockServerReplicateStream()
wg = common.NewWaitGroup(1)
go func() {
err := fc.Replicate(stream)
assert.ErrorIs(t, err, context.Canceled)
wg.Done()
}()
stream.AddRequest(createAddRequest(t, 2, 0, map[string]string{"a": "0", "b": "1"}, wal.InvalidOffset))
// Wait for response
response = stream.GetResponse()
assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
assert.EqualValues(t, 0, response.Offset)
// Write next entry
stream.AddRequest(createAddRequest(t, 2, 1, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset))

// Wait for response
response = stream.GetResponse()
assert.EqualValues(t, 1, response.Offset)

assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
assert.EqualValues(t, 2, fc.Term())

stream.AddRequest(createAddRequest(t, 2, 2, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset))
response = stream.GetResponse()
assert.EqualValues(t, 2, response.Offset)
assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
assert.EqualValues(t, 2, fc.Term())

// Double-check the values in the DB
// Keys are not there because they were not part of the commit offset
dbRes, err := fc.(*followerController).db.Get(&proto.GetRequest{
Expand Down

0 comments on commit ee0a7a4

Please sign in to comment.