Skip to content

Commit

Permalink
[raft] move remove data out of removeReplica (#8043)
Browse files Browse the repository at this point in the history
Also fix RemoveNodeFromCluster test. Sometimes test fail with
shardNotFound when
we call removeReplica on the store where c2n4 resides.

Seperate RemoveData from RemoveReplica so that we can call try
removeReplica on
different replicas (in following PRs)

buildbuddy-io/buildbuddy-internal#4220
  • Loading branch information
luluz66 authored Dec 11, 2024
1 parent bfbff1f commit a6255c5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 28 deletions.
1 change: 1 addition & 0 deletions enterprise/server/raft/driver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/driver",
visibility = ["//visibility:public"],
deps = [
"//enterprise/server/raft/client",
"//enterprise/server/raft/config",
"//enterprise/server/raft/constants",
"//enterprise/server/raft/header",
Expand Down
31 changes: 28 additions & 3 deletions enterprise/server/raft/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/client"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/config"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/constants"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/header"
Expand Down Expand Up @@ -300,15 +301,16 @@ type Queue struct {
pq *priorityQueue
pqItemMap map[uint64]*pqItem

clock clockwork.Clock
log log.Logger
clock clockwork.Clock
log log.Logger
apiClient *client.APIClient

eg *errgroup.Group
egCtx context.Context
egCancel context.CancelFunc
}

func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, clock clockwork.Clock) *Queue {
func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, apiClient *client.APIClient, clock clockwork.Clock) *Queue {
storeMap := storemap.New(gossipManager, clock)
ctx, cancelFunc := context.WithCancel(context.Background())
eg, gctx := errgroup.WithContext(ctx)
Expand All @@ -320,6 +322,7 @@ func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Lo
maxSize: 100,
clock: clock,
log: nhlog,
apiClient: apiClient,

eg: eg,
egCtx: gctx,
Expand Down Expand Up @@ -1043,6 +1046,28 @@ func (rq *Queue) applyChange(ctx context.Context, change *change) error {
return err
}
rq.log.Infof("RemoveReplicaRequest finished: %+v", change.removeOp)

replicaDesc := &rfpb.ReplicaDescriptor{RangeId: change.removeOp.GetRange().GetRangeId(), ReplicaId: change.removeOp.GetReplicaId()}
// Remove the data from the now stopped node. This is best-effort only,
// because we can remove the replica when the node is dead; and in this case,
// we won't be able to connect to the node.
c, err := rq.apiClient.GetForReplica(ctx, replicaDesc)
if err != nil {
rq.log.Warningf("RemoveReplica unable to remove data on c%dn%d, err getting api client: %s", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), err)
return nil
}
_, err = c.RemoveData(ctx, &rfpb.RemoveDataRequest{
RangeId: replicaDesc.GetRangeId(),
ReplicaId: replicaDesc.GetReplicaId(),
Start: change.removeOp.GetRange().GetStart(),
End: change.removeOp.GetRange().GetEnd(),
})
if err != nil {
rq.log.Warningf("RemoveReplica unable to remove data err: %s", err)
return nil
}

rq.log.Infof("Removed shard: c%dn%d", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId())
}
if change.transferLeadershipOp != nil {
_, err := rq.store.TransferLeadership(ctx, change.transferLeadershipOp)
Expand Down
32 changes: 8 additions & 24 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH
usages, err := usagetracker.New(s.sender, s.leaser, gossipManager, s.NodeDescriptor(), partitions, clock)

if *enableDriver {
s.driverQueue = driver.NewQueue(s, gossipManager, nhLog, clock)
s.driverQueue = driver.NewQueue(s, gossipManager, nhLog, apiClient, clock)
}
s.deleteSessionWorker = newDeleteSessionsWorker(clock, s)

Expand Down Expand Up @@ -2283,12 +2283,15 @@ func (s *Store) RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaReques
var replicaDesc *rfpb.ReplicaDescriptor
for _, replica := range req.GetRange().GetReplicas() {
if replica.GetReplicaId() == req.GetReplicaId() {
if replica.GetNhid() == s.NHID() {
return nil, status.InvalidArgumentErrorf("c%dn%d is on the node %s: cannot remove", req.GetRange().GetRangeId(), req.GetReplicaId(), s.NHID())
}
replicaDesc = replica
break
}
}
if replicaDesc == nil {
return nil, status.FailedPreconditionErrorf("No node with id %d found in range: %+v", req.GetReplicaId(), req.GetRange())
return nil, status.FailedPreconditionErrorf("No replica with replica_id %d found in range: %+v", req.GetReplicaId(), req.GetRange())
}

// First, update the range descriptor information to reflect the
Expand All @@ -2298,35 +2301,16 @@ func (s *Store) RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaReques
return nil, err
}

if err = s.syncRequestDeleteReplica(ctx, replicaDesc.GetRangeId(), replicaDesc.GetReplicaId()); err != nil {
return nil, err
if err = s.syncRequestDeleteReplica(ctx, req.GetRange().GetRangeId(), req.GetReplicaId()); err != nil {
return nil, status.InternalErrorf("nodehost.SyncRequestDeleteReplica failed for c%dn%d: %s", req.GetRange().GetRangeId(), req.GetReplicaId(), err)
}

rsp := &rfpb.RemoveReplicaResponse{
Range: rd,
}

// Remove the data from the now stopped node. This is best-effort only,
// because we can remove the replica when the node is dead; and in this case,
// we won't be able to connect to the node.
c, err := s.apiClient.GetForReplica(ctx, replicaDesc)
if err != nil {
s.log.Warningf("RemoveReplica unable to remove data on c%dn%d, err getting api client: %s", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), err)
return rsp, nil
}
_, err = c.RemoveData(ctx, &rfpb.RemoveDataRequest{
RangeId: replicaDesc.GetRangeId(),
ReplicaId: replicaDesc.GetReplicaId(),
Start: rd.GetStart(),
End: rd.GetEnd(),
})
if err != nil {
s.log.Warningf("RemoveReplica unable to remove data err: %s", err)
return rsp, nil
}

s.log.Infof("Removed shard: c%dn%d", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId())
return rsp, nil

}

func (s *Store) reserveReplicaIDs(ctx context.Context, n int) ([]uint64, error) {
Expand Down
15 changes: 14 additions & 1 deletion enterprise/server/raft/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func TestAddNodeToCluster(t *testing.T) {
}

func TestRemoveNodeFromCluster(t *testing.T) {
// disable txn cleanup and zombie scan, because advance the fake clock can
// prematurely trigger txn cleanup and zombie cleanup.
flags.Set(t, "cache.raft.enable_txn_cleanup", false)
flags.Set(t, "cache.raft.zombie_node_scan_interval", 0)
sf := testutil.NewStoreFactory(t)
s1 := sf.NewStore(t)
s2 := sf.NewStore(t)
Expand All @@ -277,10 +281,19 @@ func TestRemoveNodeFromCluster(t *testing.T) {

s := testutil.GetStoreWithRangeLease(t, ctx, stores, 2)

// RemoveReplica can't remove the replica on its own machine.
rd := s.GetRange(2)
replicaIdToRemove := uint64(0)
for _, repl := range rd.GetReplicas() {
if repl.GetNhid() != s.NHID() {
replicaIdToRemove = repl.GetReplicaId()
break
}
}
log.Infof("remove replica c%dn%d", rd.GetRangeId(), replicaIdToRemove)
_, err := s.RemoveReplica(ctx, &rfpb.RemoveReplicaRequest{
Range: rd,
ReplicaId: 4,
ReplicaId: replicaIdToRemove,
})
require.NoError(t, err)

Expand Down

0 comments on commit a6255c5

Please sign in to comment.