Skip to content

Commit

Permalink
[raft] when removing zombie, try different replicas to remove the rep…
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 authored Dec 12, 2024
1 parent fc4a127 commit d9cb346
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 48 deletions.
88 changes: 53 additions & 35 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,21 @@ func (s *Store) syncRequestDeleteReplica(ctx context.Context, rangeID, replicaID
return err
}

// syncRequestStopAndDeleteReplica attempts to delete a replica but stops it if
// removeAndStopReplica attempts to delete a replica but stops it if
// the delete fails because this is the last node in the cluster.
func (s *Store) syncRequestStopAndDeleteReplica(ctx context.Context, rangeID, replicaID uint64) error {
err := s.syncRequestDeleteReplica(ctx, rangeID, replicaID)
func (s *Store) removeAndStopReplica(ctx context.Context, rd *rfpb.RangeDescriptor, replicaID uint64) error {
runFn := func(c rfspb.ApiClient, h *rfpb.Header) error {
_, err := c.RemoveReplica(ctx, &rfpb.RemoveReplicaRequest{
Range: rd,
ReplicaId: replicaID,
})
return err
}
_, err := s.sender.TryReplicas(ctx, rd, runFn, func(rd *rfpb.RangeDescriptor, replicaIdx int) *rfpb.Header {
return nil
})
rangeID := rd.GetRangeId()

if err == dragonboat.ErrRejected {
log.Warningf("request to delete replica c%dn%d was rejected, attempting to stop...: %s", rangeID, replicaID, err)
err := client.RunNodehostFn(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -1302,7 +1313,7 @@ func (s *Store) checkReplicaMembership(ctx context.Context, rangeID uint64, nhid
}

// isZombieNode checks whether a node is a zombie node.
func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo, rd *rfpb.RangeDescriptor) bool {
func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo, localRD, remoteRD *rfpb.RangeDescriptor) bool {
membershipStatus := s.checkMembershipStatus(ctx, shardInfo)
if membershipStatus == membershipStatusNotMember {
return true
Expand All @@ -1312,7 +1323,7 @@ func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo
return false
}

if rd == nil {
if localRD == nil {
return true
}

Expand All @@ -1322,7 +1333,7 @@ func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo
// behind, but it cannot get updates from other nodes b/c it was removed from the
// cluster.

if rd.GetStart() == nil {
if localRD.GetStart() == nil {
s.log.Debugf("range descriptor for c%dn%d doesn't have start", shardInfo.ShardID, shardInfo.ReplicaID)
// This could happen in the middle of a split. We mark it as a
// potential zombie. After *zombieMinDuration, if the range still
Expand All @@ -1332,15 +1343,10 @@ func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo
// of the last replica of the shard will fail.
return true
}
updatedRD, err := s.Sender().LookupRangeDescriptor(ctx, rd.GetStart(), true /*skip Cache */)
if err != nil {
s.log.Errorf("failed to look up range descriptor for c%dn%d: %s", shardInfo.ShardID, shardInfo.ReplicaID, err)
return false
}
if updatedRD.GetGeneration() >= rd.GetGeneration() {
rd = updatedRD
if remoteRD.GetGeneration() >= localRD.GetGeneration() {
localRD = remoteRD
}
for _, r := range rd.GetReplicas() {
for _, r := range localRD.GetReplicas() {
if r.GetRangeId() == shardInfo.ShardID && r.GetReplicaId() == shardInfo.ReplicaID {
return false
}
Expand Down Expand Up @@ -1370,18 +1376,27 @@ func (s *Store) cleanupZombieNodes(ctx context.Context) {
}
sInfo := nInfo.ShardInfoList[idx]
idx += 1
rd := s.lookupRange(sInfo.ShardID)
if s.isZombieNode(ctx, sInfo, rd) {
localRD := s.lookupRange(sInfo.ShardID)
remoteRD, err := s.Sender().LookupRangeDescriptor(ctx, localRD.GetStart(), true /*skip Cache */)
if err != nil {
s.log.Errorf("failed to look up range descriptor for c%dn%d: %s", sInfo.ShardID, sInfo.ReplicaID, err)
continue
}
if s.isZombieNode(ctx, sInfo, localRD, remoteRD) {
s.log.Debugf("Found a potential Zombie: %+v", sInfo)
potentialZombie := sInfo
deleteTimer := s.clock.AfterFunc(*zombieMinDuration, func() {

rd := s.lookupRange(sInfo.ShardID)
if !s.isZombieNode(ctx, potentialZombie, rd) {
localRD := s.lookupRange(sInfo.ShardID)
remoteRD, err := s.Sender().LookupRangeDescriptor(ctx, localRD.GetStart(), true /*skip Cache */)
if err != nil {
s.log.Errorf("failed to look up range descriptor for c%dn%d: %s", sInfo.ShardID, sInfo.ReplicaID, err)
return
}
if !s.isZombieNode(ctx, potentialZombie, localRD, remoteRD) {
return
}
s.log.Debugf("Removing zombie node: %+v...", potentialZombie)
err := s.syncRequestStopAndDeleteReplica(ctx, potentialZombie.ShardID, potentialZombie.ReplicaID)
err = s.removeAndStopReplica(ctx, remoteRD, potentialZombie.ReplicaID)
if err != nil {
s.log.Warningf("Error stopping and deleting zombie replica c%dn%d: %s", potentialZombie.ShardID, potentialZombie.ReplicaID, err)
return
Expand All @@ -1390,9 +1405,9 @@ func (s *Store) cleanupZombieNodes(ctx context.Context) {
RangeId: potentialZombie.ShardID,
ReplicaId: potentialZombie.ReplicaID,
}
if rd != nil && rd.GetStart() != nil && rd.GetEnd() != nil {
req.Start = rd.GetStart()
req.End = rd.GetEnd()
if localRD != nil && localRD.GetStart() != nil && localRD.GetEnd() != nil {
req.Start = localRD.GetStart()
req.End = localRD.GetEnd()
}

if _, err := s.RemoveData(ctx, req); err != nil {
Expand Down Expand Up @@ -2284,21 +2299,22 @@ func (s *Store) RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaReques
for _, replica := range req.GetRange().GetReplicas() {
if replica.GetReplicaId() == req.GetReplicaId() {
if replica.GetNhid() == s.NHID() {
return nil, status.InvalidArgumentErrorf("c%dn%d is on the node %s: cannot remove", req.GetRange().GetRangeId(), req.GetReplicaId(), s.NHID())
// return UnavailableError, so TryReplicas can skip this replica
return nil, status.UnavailableErrorf("c%dn%d is on the node %q, cannot remove itself", req.GetRange().GetRangeId(), req.GetReplicaId(), s.NHID())
}
replicaDesc = replica
break
}
}
if replicaDesc == nil {
return nil, status.FailedPreconditionErrorf("No replica with replica_id %d found in range: %+v", req.GetReplicaId(), req.GetRange())
}

// First, update the range descriptor information to reflect the
// the node being removed.
rd, err := s.removeReplicaFromRangeDescriptor(ctx, replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), req.GetRange())
if err != nil {
return nil, err
var err error
if replicaDesc != nil {
// First, update the range descriptor information to reflect the
// the node being removed.
rd, err = s.removeReplicaFromRangeDescriptor(ctx, replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), req.GetRange())
if err != nil {
return nil, err
}
}

if err = s.syncRequestDeleteReplica(ctx, req.GetRange().GetRangeId(), req.GetReplicaId()); err != nil {
Expand Down Expand Up @@ -2429,7 +2445,8 @@ func (s *Store) updateMetarange(ctx context.Context, oldStart, start, end *rfpb.
return batchRsp.AnyError()
}

func (s *Store) updateRangeDescriptor(ctx context.Context, rangeID uint64, old, new *rfpb.RangeDescriptor) error {
func (s *Store) UpdateRangeDescriptor(ctx context.Context, rangeID uint64, old, new *rfpb.RangeDescriptor) error {
s.log.Infof("start to update range descriptor for rangeID %d to gen %d", rangeID, new.GetGeneration())
oldBuf, err := proto.Marshal(old)
if err != nil {
return err
Expand Down Expand Up @@ -2473,6 +2490,7 @@ func (s *Store) updateRangeDescriptor(ctx context.Context, rangeID uint64, old,
if err != nil {
return status.InternalErrorf("failed to update range descriptor for rangeID=%d, err: %s", rangeID, err)
}
s.log.Infof("range descriptor for rangeID %d updated to gen %d", rangeID, new.GetGeneration())
return nil
}

Expand All @@ -2486,7 +2504,7 @@ func (s *Store) addReplicaToRangeDescriptor(ctx context.Context, rangeID, replic
newDescriptor.Generation = oldDescriptor.GetGeneration() + 1
newDescriptor.LastAddedReplicaId = proto.Uint64(replicaID)
newDescriptor.LastReplicaAddedAtUsec = proto.Int64(time.Now().UnixMicro())
if err := s.updateRangeDescriptor(ctx, rangeID, oldDescriptor, newDescriptor); err != nil {
if err := s.UpdateRangeDescriptor(ctx, rangeID, oldDescriptor, newDescriptor); err != nil {
return nil, err
}
return newDescriptor, nil
Expand All @@ -2505,7 +2523,7 @@ func (s *Store) removeReplicaFromRangeDescriptor(ctx context.Context, rangeID, r
newDescriptor.LastAddedReplicaId = nil
newDescriptor.LastReplicaAddedAtUsec = nil
}
if err := s.updateRangeDescriptor(ctx, rangeID, oldDescriptor, newDescriptor); err != nil {
if err := s.UpdateRangeDescriptor(ctx, rangeID, oldDescriptor, newDescriptor); err != nil {
return nil, err
}
return newDescriptor, nil
Expand Down
17 changes: 4 additions & 13 deletions enterprise/server/raft/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func TestAddGetRemoveRange(t *testing.T) {
}

func TestCleanupZombieReplicas(t *testing.T) {
// Prevent driver kicks in to add the replica back to the store.
flags.Set(t, "cache.raft.min_replicas_per_range", 1)

clock := clockwork.NewFakeClock()

sf := testutil.NewStoreFactoryWithClock(t, clock)
Expand Down Expand Up @@ -129,20 +132,8 @@ func TestCleanupZombieReplicas(t *testing.T) {
newRD.Replicas = replicas
require.Equal(t, 1, len(replicas))
newRD.Generation = rd.GetGeneration() + 1
protoBytes, err := proto.Marshal(newRD)
require.NoError(t, err)

// Write the range descriptor the meta range
writeReq, err := rbuilder.NewBatchBuilder().Add(&rfpb.DirectWriteRequest{
Kv: &rfpb.KV{
Key: keys.RangeMetaKey(newRD.GetEnd()),
Value: protoBytes,
},
}).ToProto()
require.NoError(t, err)
writeRsp, err := s1.Sender().SyncPropose(ctx, constants.MetaRangePrefix, writeReq)
require.NoError(t, err)
err = rbuilder.NewBatchResponseFromProto(writeRsp).AnyError()
err := s.UpdateRangeDescriptor(ctx, 2, rd, newRD)
require.NoError(t, err)

for {
Expand Down

0 comments on commit d9cb346

Please sign in to comment.