Skip to content

Commit

Permalink
Merge pull request #152 from rapthead/out-of-ammo
Browse files Browse the repository at this point in the history
When provider run is done it does not mean out of ammo
  • Loading branch information
trueival authored Apr 27, 2022
2 parents 7b292f9 + a0f387c commit 6ac8e04
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
12 changes: 7 additions & 5 deletions core/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@ func (ah *runAwaitHandle) awaitRun() {
if errutil.IsNotCtxError(ah.runCtx, err) {
ah.onErrAwaited(errors.WithMessage(err, "provider failed"))
}
if err == nil && !ah.isStartFinished() {
ah.log.Debug("Canceling instance start because out of ammo")
ah.instanceStartCancel()
}
case err := <-ah.aggregatorErr:
ah.aggregatorErr = nil
ah.toWait--
Expand All @@ -316,7 +312,13 @@ func (ah *runAwaitHandle) awaitRun() {
if ent := ah.log.Check(zap.DebugLevel, "Instance run awaited"); ent != nil {
ent.Write(zap.Int("id", res.ID), zap.Int("awaited", ah.awaitedInstances), zap.Error(res.Err))
}
if errutil.IsNotCtxError(ah.runCtx, res.Err) {

if res.Err == outOfAmmoErr {
if !ah.isStartFinished() {
ah.log.Debug("Canceling instance start because out of ammo")
ah.instanceStartCancel()
}
} else if errutil.IsNotCtxError(ah.runCtx, res.Err) {
ah.onErrAwaited(errors.WithMessage(res.Err, fmt.Sprintf("instance %q run failed", res.ID)))
}
ah.checkAllInstancesAreFinished()
Expand Down
15 changes: 15 additions & 0 deletions core/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ var _ = Describe("multiple instance", func() {
Expect(pool.metrics.InstanceStart.Get()).To(BeNumerically("<=", 3))
}, 1)

It("when provider run done it does not mean out of ammo; instance start is not canceled", func() {
conf, _ := newTestPoolConf()
conf.Provider = provider.NewNumBuffered(3)
conf.NewRPSSchedule = func() (core.Schedule, error) {
return schedule.NewOnce(1), nil
}
conf.StartupSchedule = schedule.NewOnce(3)
pool := newPool(ginkgoutil.NewLogger(), newTestMetrics(), nil, conf)
ctx := context.Background()

err := pool.Run(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(pool.metrics.InstanceStart.Get()).To(BeNumerically("==", 3))
}, 1)

It("out of RPS - instance start is canceled", func() {
conf, _ := newTestPoolConf()
conf.NewRPSSchedule = func() (core.Schedule, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/engine/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (i *instance) shoot(ctx context.Context) (err error) {
ammo, ok := i.provider.Acquire()
if !ok {
i.log.Debug("Out of ammo")
break
return outOfAmmoErr
}
if tag.Debug {
i.log.Debug("Ammo acquired", zap.Any("ammo", ammo))
Expand Down Expand Up @@ -124,3 +124,5 @@ func (i *instance) Close() error {
i.log.Debug("Gun closed")
return err
}

var outOfAmmoErr = errors.New("Out of ammo")
7 changes: 7 additions & 0 deletions core/provider/num.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func NewNum(limit int) core.Provider {
}
}

func NewNumBuffered(limit int) core.Provider {
return &num{
limit: limit,
sink: make(chan core.Ammo, limit),
}
}

type NumConfig struct {
Limit int
}
Expand Down

0 comments on commit 6ac8e04

Please sign in to comment.