Skip to content

Commit

Permalink
Improve workerJamDuration mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
johnteee committed May 19, 2022
1 parent 94ce7e4 commit f5dea61
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
16 changes: 16 additions & 0 deletions worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type DefaultWorkerPool struct {
jobQueue *fpgo.BufferedChannelQueue[func()]

workerCount int
workerBusy int
spawnWorkerCh fpgo.ChannelQueue[int]
lastAliveTime time.Time

Expand Down Expand Up @@ -122,6 +123,7 @@ func (workerPoolSelf *DefaultWorkerPool) trySpawn() {
}
// Avoid Jam if (now - lastAliveTime) is over workerJamDuration
if time.Now().Sub(workerPoolSelf.lastAliveTime) > workerPoolSelf.workerJamDuration &&
workerPoolSelf.workerBusy >= workerPoolSelf.workerCount &&
workerPoolSelf.workerCount >= expectedWorkerCount {
expectedWorkerCount = workerPoolSelf.workerCount + 1
}
Expand Down Expand Up @@ -176,6 +178,7 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
// workerID := time.Now()
workerPoolSelf.lastAliveTime = time.Now()
workerPoolSelf.workerCount++
isBusy := false

go func() {
// Recover & Recycle
Expand All @@ -188,6 +191,9 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)

workerPoolSelf.lock.Lock()
workerPoolSelf.workerCount--
if isBusy {
workerPoolSelf.workerBusy --
}
workerPoolSelf.lock.Unlock()
}()

Expand All @@ -203,7 +209,17 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
select {
case job := <-workerPoolSelf.jobQueue.GetChannel():
if job != nil {
workerPoolSelf.lock.Lock()
isBusy = true
workerPoolSelf.workerBusy ++
workerPoolSelf.lock.Unlock()

job()

workerPoolSelf.lock.Lock()
workerPoolSelf.workerBusy --
isBusy = false
workerPoolSelf.lock.Unlock()
}
case <-time.After(workerPoolSelf.workerExpiryDuration):
workerPoolSelf.lock.RLock()
Expand Down
25 changes: 23 additions & 2 deletions worker/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,47 @@ func TestWorkerJamDuration(t *testing.T) {
v := i
err = workerPool.Schedule(func() {
// Nothing to do
time.Sleep(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
t.Log(v)
anyOneDone = true
})
assert.NoError(t, err)
}
time.Sleep(1 * time.Millisecond)
time.Sleep(3 * time.Millisecond)
// BatchSize: 0, SetWorkerSizeStandBy: 3 -> 3 workers
assert.Equal(t, 3, defaultWorkerPool.workerCount)
time.Sleep(3 * time.Millisecond)
// Though there're blocking jobs, but no newest job goes into the queue
assert.Equal(t, 3, defaultWorkerPool.workerCount)
// There're new jobs going to the queue, and all goroutines are busy
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
time.Sleep(3 * time.Millisecond)
// A new expected goroutine is generated
assert.Equal(t, 4, defaultWorkerPool.workerCount)
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
time.Sleep(3 * time.Millisecond)
// Only non blocking jobs, thus keep the same amount
assert.Equal(t, 4, defaultWorkerPool.workerCount)
// There's a blocking jobs going to the queue
workerPool.Schedule(func(){
time.Sleep(20 * time.Millisecond)
t.Log(3)
anyOneDone = true
})
time.Sleep(3 * time.Millisecond)
// Though there're blocking jobs, but no newest job goes into the queue
assert.Equal(t, 4, defaultWorkerPool.workerCount)
// There're new jobs going to the queue, and all goroutines are busy
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
workerPool.Schedule(func(){})
assert.Equal(t, false, anyOneDone)
time.Sleep(1 * time.Millisecond)
// A new expected goroutine is generated
assert.Equal(t, 5, defaultWorkerPool.workerCount)
}

0 comments on commit f5dea61

Please sign in to comment.