Skip to content

Commit

Permalink
Merge pull request #401 from spacemeshos/400-stop-worker-when-recover…
Browse files Browse the repository at this point in the history
…ed-round-is-cancelled

Stop worker when ctx is canceled during recovered round execution
  • Loading branch information
poszu authored Sep 20, 2023
2 parents c3507d0 + 5c143be commit 6392954
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
21 changes: 14 additions & 7 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,27 @@ func (s *Service) loop(ctx context.Context, roundToResume *round) error {
unlock := lockOSThread(ctx, roundTidFile)
err := round.RecoverExecution(ctx, end, s.cfg.TreeFileBufferSize)
unlock()
cleanupRound := func(err error) {
eg.Go(func() error {
if err := round.Teardown(ctx, err == nil); err != nil {
logger.Warn("round teardown failed", zap.Error(err), zap.Uint("epoch", round.epoch))
}
return nil
})
}
switch {
case err == nil:
s.onNewProof(ctx, round.epoch, round.execution)
cleanupRound(nil)
case errors.Is(err, context.Canceled):
logger.Info("recovered round execution canceled", zap.Uint("epoch", round.epoch))
cleanupRound(err)
return nil
default:
logger.Error("recovered round execution failed", zap.Error(err), zap.Uint("epoch", round.epoch))
cleanupRound(err)
return err
}
eg.Go(func() error {
if err := round.Teardown(ctx, err == nil); err != nil {
logger.Warn("round teardown failed", zap.Error(err))
}
return nil
})
}

closedRounds := s.registration.RegisterForRoundClosed(ctx)
Expand Down Expand Up @@ -183,7 +190,7 @@ func (s *Service) loop(ctx context.Context, roundToResume *round) error {

case <-ctx.Done():
logger.Info("service shutting down")
return eg.Wait()
return nil
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,53 @@ func TestRemoveRecoveredOpenRound(t *testing.T) {
cancel()
req.NoError(eg.Wait())
}

// Test that the service doesn't restart a round afresh if it's recovered execution is canceled.
// This is a regression test for https://github.com/spacemeshos/poet/issues/400
func TestServiceCancelRecoveredRound(t *testing.T) {
req := require.New(t)
tempdir := t.TempDir()
genesis := time.Now()
ctx := logging.NewContext(context.Background(), zaptest.NewLogger(t))

closedRoundsChan := make(chan service.ClosedRound)
registration := mocks.NewMockRegistrationService(gomock.NewController(t))
registration.EXPECT().RegisterForRoundClosed(gomock.Any()).Return(closedRoundsChan)

// Create a new service instance.
s, err := service.New(
ctx,
genesis,
tempdir,
registration,
&server.RoundConfig{EpochDuration: time.Hour},
)
req.NoError(err)

runCtx, cancel := context.WithCancel(ctx)
defer cancel()
var eg errgroup.Group
eg.Go(func() error { return s.Run(runCtx) })

// Start round 0
membershipRoot := []byte{1, 2, 3, 4}
closedRoundsChan <- service.ClosedRound{Epoch: 0, MembershipRoot: membershipRoot}

cancel()
req.NoError(eg.Wait())

// Create a new service instance.
// Should not register for closed rounds as the recovered round will be canceled.
s, err = service.New(
ctx,
genesis,
tempdir,
registration,
&server.RoundConfig{EpochDuration: time.Hour},
)
req.NoError(err)

runCtx, cancel = context.WithCancel(ctx)
cancel()
req.NoError(s.Run(runCtx))
}

0 comments on commit 6392954

Please sign in to comment.