diff --git a/enterprise/server/raft/driver/BUILD b/enterprise/server/raft/driver/BUILD index 6a5266a2f8a..d33d07bffac 100644 --- a/enterprise/server/raft/driver/BUILD +++ b/enterprise/server/raft/driver/BUILD @@ -19,6 +19,7 @@ go_library( "//server/util/status", "@com_github_jonboulle_clockwork//:clockwork", "@com_github_prometheus_client_golang//prometheus", + "@org_golang_x_sync//errgroup", ], ) diff --git a/enterprise/server/raft/driver/driver.go b/enterprise/server/raft/driver/driver.go index c1f5735eafe..939b72bb559 100644 --- a/enterprise/server/raft/driver/driver.go +++ b/enterprise/server/raft/driver/driver.go @@ -22,6 +22,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/status" "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" rfpb "github.com/buildbuddy-io/buildbuddy/proto/raft" ) @@ -264,10 +265,16 @@ type Queue struct { clock clockwork.Clock log log.Logger + + eg *errgroup.Group + egCtx context.Context + egCancel context.CancelFunc } func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, clock clockwork.Clock) *Queue { storeMap := storemap.New(gossipManager, clock) + ctx, cancelFunc := context.WithCancel(context.Background()) + eg, gctx := errgroup.WithContext(ctx) return &Queue{ storeMap: storeMap, pq: &priorityQueue{}, @@ -276,6 +283,10 @@ func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Lo maxSize: 100, clock: clock, log: nhlog, + + eg: eg, + egCtx: gctx, + egCancel: cancelFunc, } } @@ -399,12 +410,30 @@ func (rq *Queue) pop() IReplica { return repl } -func (rq *Queue) Start(ctx context.Context) { +func (rq *Queue) Start() { + rq.eg.Go(func() error { + rq.processQueue() + return nil + }) +} + +func (rq *Queue) Stop() { + rq.log.Infof("Driver shutdown started") + now := time.Now() + defer func() { + rq.log.Infof("Driver shutdown finished in %s", time.Since(now)) + }() + + rq.egCancel() + rq.eg.Wait() +} + +func (rq *Queue) processQueue() { queueDelay := rq.clock.NewTicker(queueWaitDuration) defer queueDelay.Stop() for { select { - case <-ctx.Done(): + case <-rq.egCtx.Done(): return case <-queueDelay.Chan(): } @@ -416,15 +445,16 @@ func (rq *Queue) Start(ctx context.Context) { continue } - requeue, err := rq.processReplica(ctx, repl) + requeue, err := rq.processReplica(rq.egCtx, repl) if err != nil { // TODO: check if err can be retried. rq.log.Errorf("failed to process replica: %s", err) } else { rq.log.Debugf("successfully processed replica: %d", repl.RangeID()) } - rq.postProcess(ctx, repl, requeue) + rq.postProcess(rq.egCtx, repl, requeue) } + } // findDeadReplica finds a dead replica to be removed. diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index e6e798a4660..f730cfa158b 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -517,6 +517,9 @@ func (s *Store) AddEventListener() <-chan events.Event { // ranges on this node. func (s *Store) Start() error { s.usages.Start() + if s.driverQueue != nil { + s.driverQueue.Start() + } s.eg.Go(func() error { s.handleEvents(s.egCtx) return nil @@ -551,12 +554,6 @@ func (s *Store) Start() error { s.scanReplicas(s.egCtx) return nil }) - s.eg.Go(func() error { - if s.driverQueue != nil { - s.driverQueue.Start(s.egCtx) - } - return nil - }) s.eg.Go(func() error { s.deleteSessionWorker.Start(s.egCtx) return nil @@ -567,6 +564,9 @@ func (s *Store) Start() error { func (s *Store) Stop(ctx context.Context) error { s.log.Info("Store: started to shut down") + if s.driverQueue != nil { + s.driverQueue.Stop() + } s.dropLeadershipForShutdown() now := time.Now() defer func() {