-
Notifications
You must be signed in to change notification settings - Fork 3
/
pipe.go
59 lines (46 loc) · 1.05 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package parapipe
type pipe struct {
in jobChan
out jobChan
processErrors bool
queue chan queuedJobChan
closeInCh chan struct{}
}
// Job is a short callback signature, used in pipes
type Job func(msg interface{}) interface{}
type (
jobChan chan interface{}
queuedJobChan chan interface{}
)
func newPipe(job Job, concurrency int, processErrors bool) *pipe {
p := &pipe{
in: make(jobChan, 1),
out: make(jobChan, 1),
processErrors: processErrors,
queue: make(chan queuedJobChan, concurrency),
closeInCh: make(chan struct{}),
}
go func() {
for msg := range p.in {
queued := make(queuedJobChan, 1)
_, isError := msg.(error)
if isError && !p.processErrors {
queued <- msg
} else {
go func(job Job, msg interface{}, queued queuedJobChan) {
queued <- job(msg)
}(job, msg, queued)
}
p.queue <- queued
}
close(p.queue)
}()
go func() {
for processed := range p.queue {
p.out <- <-processed
close(processed)
}
close(p.out)
}()
return p
}