-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher_test.go
126 lines (100 loc) · 2.46 KB
/
dispatcher_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package goreqdispatcher
import (
"errors"
"runtime"
"testing"
"time"
)
func TestDispatcherOnNew(t *testing.T) {
var goroutinecount = runtime.NumGoroutine()
var proc = func(v []interface{}) error {
return nil
}
var batchStep, nbWorkers = 1, 2
var d = NewDispatcher(batchStep, nbWorkers, proc)
if goroutinecount+nbWorkers != runtime.NumGoroutine() {
t.Fatal("no new worker start")
}
d.Stop()
checkThatGoRoutinesAreClosed(t)
}
func TestDispatcherProcessWithError(t *testing.T) {
var experr = errors.New("test")
var proc = func(v []interface{}) error {
p, ok := v[0].(*int)
if *p == 5 {
return experr
}
if ok {
*p++
}
return nil
}
var batchStep, nbWorkers = 1, 2
var d = NewDispatcher(batchStep, nbWorkers, proc)
var v1, v2, v3, v4 = 1, 5, 8, 10
var err = d.Process([]interface{}{&v1, &v2, &v3, &v4})
if err != experr {
t.Fatalf("unexpected error: %s", err)
}
var opened bool
if _, opened = <-d.in; opened {
t.Fatal("in channel not closed")
}
if _, opened = <-d.out; opened {
t.Fatal("out channel not closed")
}
checkThatGoRoutinesAreClosed(t)
}
func TestDispatcherProcessWithOnlyError(t *testing.T) {
var experr = errors.New("test")
var proc = func(v []interface{}) error {
<-time.After(100 * time.Millisecond)
return experr
}
var batchStep, nbWorkers = 1, 2
var d = NewDispatcher(batchStep, nbWorkers, proc)
var v1, v2, v3, v4 = 1, 5, 8, 10
var err = d.Process([]interface{}{&v1, &v2, &v3, &v4})
if err != experr {
t.Fatalf("unexpected error: %s", err)
}
var opened bool
if _, opened = <-d.in; opened {
t.Fatal("in channel not closed")
}
// empty d.out errors
for _ = range d.out {
}
if _, opened = <-d.out; opened {
t.Fatal("out channel not closed")
}
checkThatGoRoutinesAreClosed(t)
}
func TestDispatcherProcess(t *testing.T) {
var proc = func(v []interface{}) error {
p, ok := v[0].(*int)
if ok {
*p++
}
return nil
}
var batchStep, nbWorkers = 2, 2
var d = NewDispatcher(batchStep, nbWorkers, proc)
var v1, v2, v3, v4, v5 = 1, 5, 8, 10, 13
var err = d.Process([]interface{}{&v1, &v2, &v3, &v4, &v5})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if v1 != 2 || v2 != 5 || v3 != 9 || v4 != 10 || v5 != 14 {
t.Fatalf("unexpected value: %d %d %d %d %d", v1, v2, v3, v4, v5)
}
var opened bool
if _, opened = <-d.in; opened {
t.Fatal("in channel not closed")
}
if _, opened = <-d.out; opened {
t.Fatal("out channel not closed")
}
checkThatGoRoutinesAreClosed(t)
}