Skip to content

Commit

Permalink
Minor fixes (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored May 29, 2019
1 parent 1e39a76 commit 93c06e5
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 51 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ sudo: false

language: go
go:
- "1.11"
- "1.11.x"
- "1.12.x"
- "1.x"
env:
- GO111MODULE=on
Expand Down
34 changes: 16 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
module github.com/msales/streams/v3

require (
github.com/DataDog/zstd v1.3.5 // indirect
github.com/Shopify/sarama v1.20.1
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/DATA-DOG/go-sqlmock v1.3.3
github.com/DataDog/zstd v1.4.0 // indirect
github.com/Shopify/sarama v1.22.1
github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-redis/redis v6.15.1+incompatible // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/msales/pkg/v3 v3.8.0
github.com/go-redis/redis v6.15.2+incompatible // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/msales/pkg/v3 v3.16.2
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f // indirect
github.com/prometheus/common v0.2.0 // indirect
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/prometheus/client_golang v0.9.3 // indirect
github.com/prometheus/common v0.4.1 // indirect
github.com/prometheus/procfs v0.0.0-20190528151240-3cb620ac02d0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc // indirect
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect
golang.org/x/sys v0.0.0-20190529085034-854af27f14a7 // indirect
golang.org/x/text v0.3.2 // indirect
)
86 changes: 62 additions & 24 deletions go.sum

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,8 @@ func TestSource_Consume(t *testing.T) {
SetOffset("test_topic", 0, sarama.OffsetNewest, 10).
SetOffset("test_topic", 0, sarama.OffsetOldest, 7),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetVersion(1).
SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")).
SetMessage("test_topic", 0, 11, sarama.StringEncoder("bar")).
SetMessage("test_topic", 0, 12, sarama.StringEncoder("baz")).
SetHighWaterMark("test_topic", 0, 14),
"LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{
Err: sarama.ErrNoError,
Expand All @@ -283,7 +282,7 @@ func TestSource_Consume(t *testing.T) {
s, _ := kafka.NewSource(c)
defer s.Close()

time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)

msg, err := s.Consume()

Expand Down Expand Up @@ -359,6 +358,7 @@ func TestSource_Commit(t *testing.T) {
SetOffset("test_topic", 0, sarama.OffsetNewest, 10).
SetOffset("test_topic", 0, sarama.OffsetOldest, 7),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetVersion(1).
SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")).
SetHighWaterMark("test_topic", 0, 14),
"OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t),
Expand Down Expand Up @@ -414,6 +414,7 @@ func TestSource_CommitNilMetadata(t *testing.T) {
SetOffset("test_topic", 0, sarama.OffsetNewest, 10).
SetOffset("test_topic", 0, sarama.OffsetOldest, 7),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetVersion(1).
SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")).
SetHighWaterMark("test_topic", 0, 14),
"LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{
Expand Down Expand Up @@ -467,6 +468,7 @@ func TestSource_CommitReturnError(t *testing.T) {
SetOffset("test_topic", 0, sarama.OffsetNewest, 10).
SetOffset("test_topic", 0, sarama.OffsetOldest, 7),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetVersion(1).
SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")).
SetHighWaterMark("test_topic", 0, 14),
"OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t).
Expand Down
9 changes: 5 additions & 4 deletions mocks/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"errors"
"reflect"
"testing"

"github.com/msales/streams/v3"
Expand Down Expand Up @@ -80,8 +81,8 @@ func (p *Pipe) Forward(msg streams.Message) error {
record := p.expectForward[0]
p.expectForward = p.expectForward[1:]

if (record.key != Anything && msg.Key != record.key) ||
(record.value != Anything && msg.Value != record.value) {
if (record.key != Anything && !reflect.DeepEqual(msg.Key, record.key)) ||
(record.value != Anything && !reflect.DeepEqual(msg.Value, record.value)) {
p.t.Errorf("streams: mock: Arguments to Forward did not match expectation: wanted %v:%v, got %v:%v", record.key, record.value, msg.Key, msg.Value)
}

Expand All @@ -104,8 +105,8 @@ func (p *Pipe) ForwardToChild(msg streams.Message, index int) error {
record := p.expectForward[0]
p.expectForward = p.expectForward[1:]

if (record.key != Anything && msg.Key != record.key) ||
(record.value != Anything && msg.Value != record.value) ||
if (record.key != Anything && !reflect.DeepEqual(msg.Key, record.key)) ||
(record.value != Anything && !reflect.DeepEqual(msg.Value, record.value)) ||
index != record.index {
p.t.Errorf("streams: mock: Arguments to Forward did not match expectation: wanted %v:%v:%d, got %v:%v:%d", record.key, record.value, record.index, msg.Key, msg.Value, index)
}
Expand Down
13 changes: 13 additions & 0 deletions mocks/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ func TestPipe_MessagesForForward(t *testing.T) {
assert.Exactly(t, msg, msgs[0].Msg)
}

func TestPipe_MessagesForForwardWithSlice(t *testing.T) {
msg := streams.NewMessage("test", []byte{0, 1, 2, 3})
p := mocks.NewPipe(t)
p.ExpectForward("test", []byte{0, 1, 2, 3})

p.Forward(msg)

msgs := p.Messages()
assert.Len(t, msgs, 1)
assert.Exactly(t, -1, msgs[0].Index)
assert.Exactly(t, msg, msgs[0].Msg)
}

func TestPipe_MessagesForForwardToChild(t *testing.T) {
msg := streams.NewMessage("test", "test")
p := mocks.NewPipe(t)
Expand Down
2 changes: 1 addition & 1 deletion sql/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"errors"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/msales/streams/v3"
"github.com/msales/streams/v3/mocks"
sqlx "github.com/msales/streams/v3/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gopkg.in/DATA-DOG/go-sqlmock.v1"
)

func newDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
Expand Down

0 comments on commit 93c06e5

Please sign in to comment.