forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmockbroker.go
157 lines (139 loc) · 3.93 KB
/
mockbroker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package sarama
import (
"encoding/binary"
"errors"
"io"
"net"
"strconv"
)
// TestState is a generic interface for a test state, implemented e.g. by testing.T
type TestState interface {
Error(args ...interface{})
Fatal(args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
}
// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
// accepts a single connection. It reads Kafka requests from that connection and returns each response
// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
// the server sleeps for 250ms instead of reading a request).
//
// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
// waiting for a response, the test panics.
//
// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
// automatically as a convenience.
type MockBroker struct {
brokerID int32
port int32
stopper chan bool
expectations chan encoder
listener net.Listener
t TestState
}
func (b *MockBroker) BrokerID() int32 {
return b.brokerID
}
func (b *MockBroker) Port() int32 {
return b.port
}
func (b *MockBroker) Addr() string {
return b.listener.Addr().String()
}
func (b *MockBroker) Close() {
if len(b.expectations) > 0 {
b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
}
close(b.expectations)
<-b.stopper
}
func (b *MockBroker) serverLoop() (ok bool) {
var (
err error
conn net.Conn
)
defer close(b.stopper)
if conn, err = b.listener.Accept(); err != nil {
return b.serverError(err, conn)
}
reqHeader := make([]byte, 4)
resHeader := make([]byte, 8)
for expectation := range b.expectations {
_, err = io.ReadFull(conn, reqHeader)
if err != nil {
return b.serverError(err, conn)
}
body := make([]byte, binary.BigEndian.Uint32(reqHeader))
if len(body) < 10 {
return b.serverError(errors.New("Kafka request too short."), conn)
}
if _, err = io.ReadFull(conn, body); err != nil {
return b.serverError(err, conn)
}
response, err := encode(expectation)
if err != nil {
return false
}
if len(response) == 0 {
continue
}
binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
if _, err = conn.Write(resHeader); err != nil {
return b.serverError(err, conn)
}
if _, err = conn.Write(response); err != nil {
return b.serverError(err, conn)
}
}
if err = conn.Close(); err != nil {
return b.serverError(err, nil)
}
if err = b.listener.Close(); err != nil {
b.t.Error(err)
return false
}
return true
}
func (b *MockBroker) serverError(err error, conn net.Conn) bool {
b.t.Error(err)
if conn != nil {
if err := conn.Close(); err != nil {
b.t.Error(err)
}
}
if err := b.listener.Close(); err != nil {
b.t.Error(err)
}
return false
}
// NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
// test framework and a channel of responses to use. If an error occurs it is
// simply logged to the TestState and the broker exits.
func NewMockBroker(t TestState, brokerID int32) *MockBroker {
var err error
broker := &MockBroker{
stopper: make(chan bool),
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
}
broker.listener, err = net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
}
tmp, err := strconv.ParseInt(portStr, 10, 32)
if err != nil {
t.Fatal(err)
}
broker.port = int32(tmp)
go broker.serverLoop()
return broker
}
func (b *MockBroker) Returns(e encoder) {
b.expectations <- e
}