-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.go
162 lines (142 loc) · 3.59 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package goworker
import (
"sync"
)
// WorkerPoolInput contains the parameters to initialize a worker pool
type WorkerPoolInput struct {
// WorkerCount tells how many workers are to be initiated
// in the pool, default value is 1
WorkerCount int
// Buffer specifies how many tasks can be add to the queue, before
// the wp.Add becomes a blocking call
// Default Value: 100
Buffer int
}
const (
_defaultBufferSize = 100
_defaultWorkerCount = 1
)
// WorkerPool is an interface to a workerpool implementation
type WorkerPool interface {
// Add adds a task to the worker pool
// the function returns a future object that holds the response of the execution
// currently it takes Task as input so only error response is supported
Add(task *Task) *Future
// Start starts the execution of the tasks in the workerpool
Start()
// Done notifies the workerpool that there are no more tasks to be added
// if Done is never called, the workerpool will keep waiting for tasks forever
Done()
// Abort notifies workerpool to abort the execution of all remaning tasks in the pool
Abort()
// WaitForCompletion is a blocking function that waits till workerpool finishes all the tasks
WaitForCompletion()
}
type taskWrapper struct {
task *Task
response *Future
}
// NewWorkerPool returns a new instance of WorkerPool with provided specs
func NewWorkerPool(request *WorkerPoolInput) WorkerPool {
request = populateDefaults(request)
return &workerpool{
workerCount: request.WorkerCount,
abortChan: make(chan struct{}, request.WorkerCount),
taskChan: make(chan taskWrapper, request.Buffer),
}
}
func populateDefaults(request *WorkerPoolInput) *WorkerPoolInput {
if request.WorkerCount == 0 {
request.WorkerCount = _defaultWorkerCount
}
if request.Buffer == 0 {
request.Buffer = _defaultBufferSize
}
return request
}
type workerpool struct {
sync.RWMutex
wg sync.WaitGroup
workerCount int
abortChan chan struct{}
taskChan chan taskWrapper
flushResidueMutex sync.Mutex
residueTaskChan chan taskWrapper
}
func (w *workerpool) Start() {
w.wg.Add(w.workerCount)
for i := 0; i < w.workerCount; i++ {
go func() {
defer w.wg.Done()
for {
select {
case task, ok := <-w.taskChan:
if !ok {
return
}
res, err := task.task.Run()
task.response.NotifyResult(res)
task.response.NotifyError(err)
case <-w.abortChan:
// if already nil, return
w.RLock()
if w.taskChan == nil {
w.RUnlock()
return
}
w.RUnlock()
// lock for modification, and update to nil
w.Lock()
if w.taskChan != nil {
w.residueTaskChan = w.taskChan
w.taskChan = nil
}
w.Unlock()
return
}
}
}()
}
}
func (w *workerpool) Add(task *Task) *Future {
res := NewFuture()
workUnit := taskWrapper{
task: task,
response: res,
}
// return if workerpool is in an inactive state
if w.taskChan == nil {
res.NotifyResult(nil)
res.NotifyError(ErrorInactiveWorkerPool)
return res
}
w.taskChan <- workUnit
return res
}
func (w *workerpool) Done() {
if w.taskChan != nil {
close(w.taskChan)
}
}
func (w *workerpool) Abort() {
if w.taskChan == nil {
return
}
for i := 0; i < w.workerCount; i++ {
w.abortChan <- struct{}{}
}
w.wg.Wait()
w.flushResidueMutex.Lock()
defer w.flushResidueMutex.Unlock()
if w.residueTaskChan != nil {
close(w.residueTaskChan)
for task := range w.residueTaskChan {
task.response.NotifyResult(nil)
task.response.NotifyError(ErrorWorkerPoolAborted)
}
w.residueTaskChan = nil
}
}
func (w *workerpool) WaitForCompletion() {
w.wg.Wait()
}