Skip to content

Commit

Permalink
Update to Go 1.17, update Shopify/sarama to 1.30, fix tests. (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
Skandalik authored Oct 7, 2021
1 parent 64fbe5f commit d0f153d
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 340 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ jobs:
id: test
uses: ./.github/actions/external/go-test
with:
org_token: ${{ secrets.GH_TOKEN }}
org_token: ${{ secrets.GH_TOKEN }}
lint: true
test: true
vet: true
2 changes: 1 addition & 1 deletion example/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (p *commitProcessor) Close() error {

// waitForShutdown blocks until a SIGINT or SIGTERM is received.
func waitForShutdown() {
quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)

signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(quit)
Expand Down
2 changes: 1 addition & 1 deletion example/branch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func negativeMapper(msg streams.Message) (streams.Message, error) {

// waitForShutdown blocks until a SIGINT or SIGTERM is received.
func waitForShutdown() {
quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)

signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(quit)
Expand Down
3 changes: 2 additions & 1 deletion example/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"

"github.com/Shopify/sarama"

"github.com/msales/streams/v6"
"github.com/msales/streams/v6/kafka"
)
Expand Down Expand Up @@ -183,7 +184,7 @@ func (p *commitProcessor) Close() error {

// waitForShutdown blocks until a SIGINT or SIGTERM is received.
func waitForShutdown() {
quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)

signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(quit)
Expand Down
2 changes: 1 addition & 1 deletion example/merge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func addHundredMapper(msg streams.Message) (streams.Message, error) {

// waitForShutdown blocks until a SIGINT or SIGTERM is received.
func waitForShutdown() {
quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)

signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(quit)
Expand Down
2 changes: 1 addition & 1 deletion example/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func doubleMapper(msg streams.Message) (streams.Message, error) {

// waitForShutdown blocks until a SIGINT or SIGTERM is received.
func waitForShutdown() {
quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)

signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(quit)
Expand Down
32 changes: 28 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
module github.com/msales/streams/v6

go 1.14
go 1.17

require (
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/Shopify/sarama v1.26.4
github.com/Shopify/sarama v1.30.0
github.com/msales/pkg/v4 v4.4.0
github.com/stretchr/testify v1.6.1
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
github.com/stretchr/testify v1.7.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
)

require (
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/stretchr/objx v0.1.1 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
115 changes: 67 additions & 48 deletions go.sum

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"

"github.com/msales/streams/v6"
"github.com/msales/streams/v6/kafka"
"github.com/msales/streams/v6/mocks"
"github.com/stretchr/testify/assert"
)

func TestNewSinkConfig(t *testing.T) {
Expand Down Expand Up @@ -183,7 +184,8 @@ func TestSink_Commit(t *testing.T) {
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("test_topic", 0, broker0.BrokerID()),
"ProduceRequest": sarama.NewMockProduceResponse(t),
"ProduceRequest": sarama.NewMockProduceResponse(t).
SetVersion(2),
})

c := kafka.NewSinkConfig()
Expand Down Expand Up @@ -215,6 +217,7 @@ func TestSink_CommitError(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("test_topic", 0, broker0.BrokerID()),
"ProduceRequest": sarama.NewMockProduceResponse(t).
SetVersion(2).
SetError("test_topic", 0, sarama.ErrBrokerNotAvailable),
})

Expand Down
67 changes: 54 additions & 13 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,32 @@ package kafka

import (
"context"
"runtime"
"sync"
"time"

"golang.org/x/xerrors"

"github.com/Shopify/sarama"

"github.com/msales/streams/v6"
"golang.org/x/xerrors"
)

// CommitStrategy represents commit strategy for source commiting.
type CommitStrategy int

const (
// CommitAuto represents automatic commit strategy.
// It takes advantage of Shopify/sarama's AutoCommit.
CommitAuto CommitStrategy = 0

// CommitManual represents manual commit strategy. Commiting is done on Commit method in Source.
// It turns off Shopify/sarama's AutoCommit by default.
CommitManual CommitStrategy = 1

// CommitBoth represents commit strategy that uses both CommitAuto and CommitManual.
// Commiting is done using AutoCommit and on Commit method in Source
CommitBoth CommitStrategy = 2
)

// SourceConfig represents the configuration for a Kafka stream source.
Expand All @@ -24,6 +44,8 @@ type SourceConfig struct {

BufferSize int
ErrorsBufferSize int

CommitStrategy CommitStrategy
}

// NewSourceConfig creates a new Kafka source configuration.
Expand All @@ -37,6 +59,7 @@ func NewSourceConfig() *SourceConfig {
c.ValueDecoder = ByteDecoder{}
c.BufferSize = 1000
c.ErrorsBufferSize = 10
c.Consumer.Return.Errors = true

return c
}
Expand All @@ -62,6 +85,13 @@ func (c *SourceConfig) Validate() error {
return nil
}

// ModifyConfig modifies config.
func (c *SourceConfig) ModifyConfig() {
if c.CommitStrategy == CommitManual {
c.Config.Consumer.Offsets.AutoCommit.Enable = false
}
}

// Metadata represents an the kafka topic metadata.
type Metadata []*PartitionOffset

Expand Down Expand Up @@ -125,8 +155,9 @@ type PartitionOffset struct {

// Source represents a Kafka stream source.
type Source struct {
topic string
consumer sarama.ConsumerGroup
topic string
consumer sarama.ConsumerGroup
commitStrategy CommitStrategy

ctx context.Context
keyDecoder Decoder
Expand All @@ -147,6 +178,7 @@ type Source struct {

// NewSource creates a new Kafka stream source.
func NewSource(c *SourceConfig) (*Source, error) {
c.ModifyConfig()
if err := c.Validate(); err != nil {
return nil, err
}
Expand All @@ -159,15 +191,16 @@ func NewSource(c *SourceConfig) (*Source, error) {
ctx, cancel := context.WithCancel(c.Ctx)

s := &Source{
topic: c.Topic,
consumer: consumer,
ctx: ctx,
keyDecoder: c.KeyDecoder,
valueDecoder: c.ValueDecoder,
buf: make(chan *sarama.ConsumerMessage, c.BufferSize),
errs: make(chan error, c.ErrorsBufferSize),
cancelCtx: cancel,
done: make(chan struct{}),
topic: c.Topic,
consumer: consumer,
ctx: ctx,
keyDecoder: c.KeyDecoder,
valueDecoder: c.ValueDecoder,
buf: make(chan *sarama.ConsumerMessage, c.BufferSize),
errs: make(chan error, c.ErrorsBufferSize),
cancelCtx: cancel,
done: make(chan struct{}),
commitStrategy: c.CommitStrategy,
}
s.sessionWG.Add(1)

Expand Down Expand Up @@ -224,10 +257,18 @@ func (s *Source) Commit(v interface{}) error {
// that the offsets are never committed if the application crashes. This may lead to double-committing
// on rare occasions.
// The result of the "Commit" method on the Committer should be idempotent whenever possible!
//
// If offsets are needed to be committed immediately, use CommitManual or CommitBoth.
s.session.MarkOffset(pos.Topic, pos.Partition, pos.Offset+1, "")
}

return nil
// If commit strategy is not CommitAuto, session should perform global, synchronous commit of current marked offsets.
if s.commitStrategy != CommitAuto {
s.session.Commit()
runtime.Gosched() // If any error from consumer side happens after Commiting, it will be read and s.lastErr will be set.
}

return s.lastErr
}

// Close closes the Source.
Expand Down
Loading

0 comments on commit d0f153d

Please sign in to comment.