-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher.go
99 lines (83 loc) · 1.73 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package dispatcher
import (
"sync"
)
// A Dispatcher dispatches tasks to workers.
type Dispatcher struct {
pool chan *worker
queue chan Tasker
workers []*worker
wg sync.WaitGroup
quit chan struct{}
}
// New creates and initializes a new Dispatcher.
// The Dispatcher has count Workers and size queue.
func New(count, size int) *Dispatcher {
d := &Dispatcher{
pool: make(chan *worker, count),
queue: make(chan Tasker, size),
quit: make(chan struct{}),
}
d.workers = make([]*worker, cap(d.pool))
for i := 0; i < len(d.workers); i++ {
d.workers[i] = &worker{
dispatcher: d,
task: make(chan Tasker),
quit: make(chan struct{}),
}
}
return d
}
// Start starts to dispatch tasks.
func (d *Dispatcher) Start() {
for _, w := range d.workers {
w.start()
}
go func() {
for {
select {
case t := <-d.queue:
(<-d.pool).task <- t
case <-d.quit:
return
}
}
}()
}
// Enqueue enqueues a new task to the dispatcher.
func (d *Dispatcher) Enqueue(t Tasker) {
d.wg.Add(1)
d.queue <- t
}
// Wait waits until all tasks are completed.
func (d *Dispatcher) Wait() {
d.wg.Wait()
}
type worker struct {
dispatcher *Dispatcher
task chan Tasker
quit chan struct{}
}
func (w *worker) start() {
go func() {
for {
w.dispatcher.pool <- w
select {
case t := <-w.task:
t.Run()
w.dispatcher.wg.Done()
}
}
}()
}
// Tasker is the interface that wraps the Run method.
type Tasker interface {
Run()
}
// The TaskerFunc type is an adapter to allow the use of ordinary functions as task runners.
// If f is a function with the appropriate signature, TaskerFunc(f) is a Tasker that calls f.
type TaskerFunc func()
// Run calls f().
func (f TaskerFunc) Run() {
f()
}