diff --git a/core/engine/engine.go b/core/engine/engine.go index 6a03b5ed8..957b5c749 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -291,10 +291,6 @@ func (ah *runAwaitHandle) awaitRun() { if errutil.IsNotCtxError(ah.runCtx, err) { ah.onErrAwaited(errors.WithMessage(err, "provider failed")) } - if err == nil && !ah.isStartFinished() { - ah.log.Debug("Canceling instance start because out of ammo") - ah.instanceStartCancel() - } case err := <-ah.aggregatorErr: ah.aggregatorErr = nil ah.toWait-- @@ -316,7 +312,13 @@ func (ah *runAwaitHandle) awaitRun() { if ent := ah.log.Check(zap.DebugLevel, "Instance run awaited"); ent != nil { ent.Write(zap.Int("id", res.ID), zap.Int("awaited", ah.awaitedInstances), zap.Error(res.Err)) } - if errutil.IsNotCtxError(ah.runCtx, res.Err) { + + if res.Err == outOfAmmoErr { + if !ah.isStartFinished() { + ah.log.Debug("Canceling instance start because out of ammo") + ah.instanceStartCancel() + } + } else if errutil.IsNotCtxError(ah.runCtx, res.Err) { ah.onErrAwaited(errors.WithMessage(res.Err, fmt.Sprintf("instance %q run failed", res.ID))) } ah.checkAllInstancesAreFinished() diff --git a/core/engine/engine_test.go b/core/engine/engine_test.go index 187434156..062018a27 100644 --- a/core/engine/engine_test.go +++ b/core/engine/engine_test.go @@ -213,6 +213,21 @@ var _ = Describe("multiple instance", func() { Expect(pool.metrics.InstanceStart.Get()).To(BeNumerically("<=", 3)) }, 1) + It("when provider run done it does not mean out of ammo; instance start is not canceled", func() { + conf, _ := newTestPoolConf() + conf.Provider = provider.NewNumBuffered(3) + conf.NewRPSSchedule = func() (core.Schedule, error) { + return schedule.NewOnce(1), nil + } + conf.StartupSchedule = schedule.NewOnce(3) + pool := newPool(ginkgoutil.NewLogger(), newTestMetrics(), nil, conf) + ctx := context.Background() + + err := pool.Run(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(pool.metrics.InstanceStart.Get()).To(BeNumerically("==", 3)) + }, 1) + It("out of RPS - instance start is canceled", func() { conf, _ := newTestPoolConf() conf.NewRPSSchedule = func() (core.Schedule, error) { diff --git a/core/engine/instance.go b/core/engine/instance.go index 0fe81d261..42f8c882b 100644 --- a/core/engine/instance.go +++ b/core/engine/instance.go @@ -93,7 +93,7 @@ func (i *instance) shoot(ctx context.Context) (err error) { ammo, ok := i.provider.Acquire() if !ok { i.log.Debug("Out of ammo") - break + return outOfAmmoErr } if tag.Debug { i.log.Debug("Ammo acquired", zap.Any("ammo", ammo)) @@ -124,3 +124,5 @@ func (i *instance) Close() error { i.log.Debug("Gun closed") return err } + +var outOfAmmoErr = errors.New("Out of ammo") diff --git a/core/provider/num.go b/core/provider/num.go index 401ca41af..772153b76 100644 --- a/core/provider/num.go +++ b/core/provider/num.go @@ -20,6 +20,13 @@ func NewNum(limit int) core.Provider { } } +func NewNumBuffered(limit int) core.Provider { + return &num{ + limit: limit, + sink: make(chan core.Ammo, limit), + } +} + type NumConfig struct { Limit int }