Skip to content

Commit

Permalink
[raft] refactor driver start and stop code (#7990)
Browse files Browse the repository at this point in the history
To prepare the driver for rebalancing lease counts, the driver needs to
be
stopped at the start of the store.Stop() method before we try to drop
all the
leases held.
  • Loading branch information
luluz66 authored Dec 2, 2024
1 parent 4c5fc19 commit dc46533
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions enterprise/server/raft/driver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//server/util/status",
"@com_github_jonboulle_clockwork//:clockwork",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
38 changes: 34 additions & 4 deletions enterprise/server/raft/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

rfpb "github.com/buildbuddy-io/buildbuddy/proto/raft"
)
Expand Down Expand Up @@ -264,10 +265,16 @@ type Queue struct {

clock clockwork.Clock
log log.Logger

eg *errgroup.Group
egCtx context.Context
egCancel context.CancelFunc
}

func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, clock clockwork.Clock) *Queue {
storeMap := storemap.New(gossipManager, clock)
ctx, cancelFunc := context.WithCancel(context.Background())
eg, gctx := errgroup.WithContext(ctx)
return &Queue{
storeMap: storeMap,
pq: &priorityQueue{},
Expand All @@ -276,6 +283,10 @@ func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Lo
maxSize: 100,
clock: clock,
log: nhlog,

eg: eg,
egCtx: gctx,
egCancel: cancelFunc,
}
}

Expand Down Expand Up @@ -399,12 +410,30 @@ func (rq *Queue) pop() IReplica {
return repl
}

func (rq *Queue) Start(ctx context.Context) {
func (rq *Queue) Start() {
rq.eg.Go(func() error {
rq.processQueue()
return nil
})
}

func (rq *Queue) Stop() {
rq.log.Infof("Driver shutdown started")
now := time.Now()
defer func() {
rq.log.Infof("Driver shutdown finished in %s", time.Since(now))
}()

rq.egCancel()
rq.eg.Wait()
}

func (rq *Queue) processQueue() {
queueDelay := rq.clock.NewTicker(queueWaitDuration)
defer queueDelay.Stop()
for {
select {
case <-ctx.Done():
case <-rq.egCtx.Done():
return
case <-queueDelay.Chan():
}
Expand All @@ -416,15 +445,16 @@ func (rq *Queue) Start(ctx context.Context) {
continue
}

requeue, err := rq.processReplica(ctx, repl)
requeue, err := rq.processReplica(rq.egCtx, repl)
if err != nil {
// TODO: check if err can be retried.
rq.log.Errorf("failed to process replica: %s", err)
} else {
rq.log.Debugf("successfully processed replica: %d", repl.RangeID())
}
rq.postProcess(ctx, repl, requeue)
rq.postProcess(rq.egCtx, repl, requeue)
}

}

// findDeadReplica finds a dead replica to be removed.
Expand Down
12 changes: 6 additions & 6 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ func (s *Store) AddEventListener() <-chan events.Event {
// ranges on this node.
func (s *Store) Start() error {
s.usages.Start()
if s.driverQueue != nil {
s.driverQueue.Start()
}
s.eg.Go(func() error {
s.handleEvents(s.egCtx)
return nil
Expand Down Expand Up @@ -551,12 +554,6 @@ func (s *Store) Start() error {
s.scanReplicas(s.egCtx)
return nil
})
s.eg.Go(func() error {
if s.driverQueue != nil {
s.driverQueue.Start(s.egCtx)
}
return nil
})
s.eg.Go(func() error {
s.deleteSessionWorker.Start(s.egCtx)
return nil
Expand All @@ -567,6 +564,9 @@ func (s *Store) Start() error {

func (s *Store) Stop(ctx context.Context) error {
s.log.Info("Store: started to shut down")
if s.driverQueue != nil {
s.driverQueue.Stop()
}
s.dropLeadershipForShutdown()
now := time.Now()
defer func() {
Expand Down

0 comments on commit dc46533

Please sign in to comment.