-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker.go
78 lines (66 loc) · 1.61 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package fastexec
import (
"fmt"
"os"
"sync"
"time"
"github.com/golang/glog"
"github.com/juju/ratelimit"
)
var outWg sync.WaitGroup
var pending, done chan Job
var tb *ratelimit.Bucket
func Worker(in <-chan Job, out chan<- Job) {
for j := range in {
if Config.ratelimit > 0 {
wait := tb.Take(int64(Config.chunks))
if wait > 0 {
glog.V(4).Infof("ratelimited - sleeping - %v", wait)
time.Sleep(wait)
}
}
err := j.Execute()
if err != nil {
glog.Errorf("Error executing %v", err)
continue
}
outWg.Add(1)
out <- j
}
}
func StateMonitor(in <-chan Job) {
glog.V(2).Infof("--> state monitor")
for j := range in {
fmt.Print(string(j.GetResult()))
fmt.Fprintf(os.Stderr, string(j.GetErr()))
outWg.Done()
}
}
// init channels for workers
// optionally enable ratelimit if enabled
func initWorkers() {
pending, done = make(chan Job), make(chan Job)
// initialize ratelimit control if non-zero value specified
if Config.ratelimit > 0 {
quantum := int64(Config.chunks)
freqency := float64(Config.ratelimit) / float64(Config.chunks)
interval := time.Duration(float64(1) / float64(freqency) * 1e9)
glog.V(2).Infof("--> ratelimit - rate %d quantum %d interval %v", Config.ratelimit, quantum, interval)
tb = ratelimit.NewBucketWithQuantum(interval, Config.ratelimit, quantum)
}
}
func startWorkers(args []string, pending chan Job, done chan Job) {
// launch workers
for i := 0; i < Config.workers; i++ {
go Worker(pending, done)
}
go StateMonitor(done)
}
func StartWorkers() {
initWorkers()
startWorkers(Config.args, pending, done)
}
func WaitToFinish() {
inWg.Wait()
outWg.Wait()
}