From d0f153dd102cefd46ec620ef308e99728e36eb2a Mon Sep 17 00:00:00 2001 From: Hugon Sknadaj Date: Thu, 7 Oct 2021 10:31:17 +0200 Subject: [PATCH] Update to Go 1.17, update Shopify/sarama to 1.30, fix tests. (#99) --- .github/workflows/test.yaml | 5 +- example/benchmark/main.go | 2 +- example/branch/main.go | 2 +- example/kafka/main.go | 3 +- example/merge/main.go | 2 +- example/simple/main.go | 2 +- go.mod | 32 +- go.sum | 115 ++++--- kafka/sink_test.go | 7 +- kafka/source.go | 67 +++- kafka/source_test.go | 595 ++++++++++++++++++++---------------- 11 files changed, 492 insertions(+), 340 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 78d024e..c027fd6 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -22,4 +22,7 @@ jobs: id: test uses: ./.github/actions/external/go-test with: - org_token: ${{ secrets.GH_TOKEN }} \ No newline at end of file + org_token: ${{ secrets.GH_TOKEN }} + lint: true + test: true + vet: true \ No newline at end of file diff --git a/example/benchmark/main.go b/example/benchmark/main.go index 23fe02a..2b22b06 100644 --- a/example/benchmark/main.go +++ b/example/benchmark/main.go @@ -112,7 +112,7 @@ func (p *commitProcessor) Close() error { // waitForShutdown blocks until a SIGINT or SIGTERM is received. func waitForShutdown() { - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(quit) diff --git a/example/branch/main.go b/example/branch/main.go index 0f255b3..c48b33a 100644 --- a/example/branch/main.go +++ b/example/branch/main.go @@ -123,7 +123,7 @@ func negativeMapper(msg streams.Message) (streams.Message, error) { // waitForShutdown blocks until a SIGINT or SIGTERM is received. func waitForShutdown() { - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(quit) diff --git a/example/kafka/main.go b/example/kafka/main.go index d8cfcb7..b32a906 100644 --- a/example/kafka/main.go +++ b/example/kafka/main.go @@ -10,6 +10,7 @@ import ( "syscall" "github.com/Shopify/sarama" + "github.com/msales/streams/v6" "github.com/msales/streams/v6/kafka" ) @@ -183,7 +184,7 @@ func (p *commitProcessor) Close() error { // waitForShutdown blocks until a SIGINT or SIGTERM is received. func waitForShutdown() { - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(quit) diff --git a/example/merge/main.go b/example/merge/main.go index e04a244..5655411 100644 --- a/example/merge/main.go +++ b/example/merge/main.go @@ -78,7 +78,7 @@ func addHundredMapper(msg streams.Message) (streams.Message, error) { // waitForShutdown blocks until a SIGINT or SIGTERM is received. func waitForShutdown() { - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(quit) diff --git a/example/simple/main.go b/example/simple/main.go index b725433..4d022ee 100644 --- a/example/simple/main.go +++ b/example/simple/main.go @@ -67,7 +67,7 @@ func doubleMapper(msg streams.Message) (streams.Message, error) { // waitForShutdown blocks until a SIGINT or SIGTERM is received. func waitForShutdown() { - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(quit) diff --git a/go.mod b/go.mod index 5bc73ce..47475a5 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,35 @@ module github.com/msales/streams/v6 -go 1.14 +go 1.17 require ( github.com/DATA-DOG/go-sqlmock v1.4.1 - github.com/Shopify/sarama v1.26.4 + github.com/Shopify/sarama v1.30.0 github.com/msales/pkg/v4 v4.4.0 - github.com/stretchr/testify v1.6.1 - golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 + github.com/stretchr/testify v1.7.0 + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 +) + +require ( + github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/go-redis/redis v6.15.7+incompatible // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/stretchr/objx v0.1.1 // indirect + golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect + golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index f08262d..d9df653 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8= -github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= -github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= -github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/Shopify/sarama v1.30.0 h1:TOZL6r37xJBDEMLx4yjB77jxbZYXPaDow08TSK6vIL0= +github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fTKCulPVs= +github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae h1:ePgznFqEG1v3AjMklnK8H7BSc++FDSo7xfK9K7Af+0Y= +github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= @@ -22,6 +22,7 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -36,9 +37,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk= -github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -54,36 +54,48 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s= github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= -github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -97,8 +109,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= -github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -111,32 +123,37 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= -github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= -golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 h1:kETrAMYZq6WVGPa8IIixL0CaEcIUNi+1WX7grUoi3y8= +golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -148,8 +165,10 @@ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ= +golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -162,19 +181,28 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -186,27 +214,18 @@ google.golang.org/grpc v1.29.0/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= -gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= -gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI= -gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= -gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= -gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= -gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= -gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/kafka/sink_test.go b/kafka/sink_test.go index 1a523cd..f21261b 100644 --- a/kafka/sink_test.go +++ b/kafka/sink_test.go @@ -5,10 +5,11 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/msales/streams/v6" "github.com/msales/streams/v6/kafka" "github.com/msales/streams/v6/mocks" - "github.com/stretchr/testify/assert" ) func TestNewSinkConfig(t *testing.T) { @@ -183,7 +184,8 @@ func TestSink_Commit(t *testing.T) { "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("test_topic", 0, broker0.BrokerID()), - "ProduceRequest": sarama.NewMockProduceResponse(t), + "ProduceRequest": sarama.NewMockProduceResponse(t). + SetVersion(2), }) c := kafka.NewSinkConfig() @@ -215,6 +217,7 @@ func TestSink_CommitError(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("test_topic", 0, broker0.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t). + SetVersion(2). SetError("test_topic", 0, sarama.ErrBrokerNotAvailable), }) diff --git a/kafka/source.go b/kafka/source.go index a2e1509..b8440c1 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -2,12 +2,32 @@ package kafka import ( "context" + "runtime" "sync" "time" + "golang.org/x/xerrors" + "github.com/Shopify/sarama" + "github.com/msales/streams/v6" - "golang.org/x/xerrors" +) + +// CommitStrategy represents commit strategy for source commiting. +type CommitStrategy int + +const ( + // CommitAuto represents automatic commit strategy. + // It takes advantage of Shopify/sarama's AutoCommit. + CommitAuto CommitStrategy = 0 + + // CommitManual represents manual commit strategy. Commiting is done on Commit method in Source. + // It turns off Shopify/sarama's AutoCommit by default. + CommitManual CommitStrategy = 1 + + // CommitBoth represents commit strategy that uses both CommitAuto and CommitManual. + // Commiting is done using AutoCommit and on Commit method in Source + CommitBoth CommitStrategy = 2 ) // SourceConfig represents the configuration for a Kafka stream source. @@ -24,6 +44,8 @@ type SourceConfig struct { BufferSize int ErrorsBufferSize int + + CommitStrategy CommitStrategy } // NewSourceConfig creates a new Kafka source configuration. @@ -37,6 +59,7 @@ func NewSourceConfig() *SourceConfig { c.ValueDecoder = ByteDecoder{} c.BufferSize = 1000 c.ErrorsBufferSize = 10 + c.Consumer.Return.Errors = true return c } @@ -62,6 +85,13 @@ func (c *SourceConfig) Validate() error { return nil } +// ModifyConfig modifies config. +func (c *SourceConfig) ModifyConfig() { + if c.CommitStrategy == CommitManual { + c.Config.Consumer.Offsets.AutoCommit.Enable = false + } +} + // Metadata represents an the kafka topic metadata. type Metadata []*PartitionOffset @@ -125,8 +155,9 @@ type PartitionOffset struct { // Source represents a Kafka stream source. type Source struct { - topic string - consumer sarama.ConsumerGroup + topic string + consumer sarama.ConsumerGroup + commitStrategy CommitStrategy ctx context.Context keyDecoder Decoder @@ -147,6 +178,7 @@ type Source struct { // NewSource creates a new Kafka stream source. func NewSource(c *SourceConfig) (*Source, error) { + c.ModifyConfig() if err := c.Validate(); err != nil { return nil, err } @@ -159,15 +191,16 @@ func NewSource(c *SourceConfig) (*Source, error) { ctx, cancel := context.WithCancel(c.Ctx) s := &Source{ - topic: c.Topic, - consumer: consumer, - ctx: ctx, - keyDecoder: c.KeyDecoder, - valueDecoder: c.ValueDecoder, - buf: make(chan *sarama.ConsumerMessage, c.BufferSize), - errs: make(chan error, c.ErrorsBufferSize), - cancelCtx: cancel, - done: make(chan struct{}), + topic: c.Topic, + consumer: consumer, + ctx: ctx, + keyDecoder: c.KeyDecoder, + valueDecoder: c.ValueDecoder, + buf: make(chan *sarama.ConsumerMessage, c.BufferSize), + errs: make(chan error, c.ErrorsBufferSize), + cancelCtx: cancel, + done: make(chan struct{}), + commitStrategy: c.CommitStrategy, } s.sessionWG.Add(1) @@ -224,10 +257,18 @@ func (s *Source) Commit(v interface{}) error { // that the offsets are never committed if the application crashes. This may lead to double-committing // on rare occasions. // The result of the "Commit" method on the Committer should be idempotent whenever possible! + // + // If offsets are needed to be committed immediately, use CommitManual or CommitBoth. s.session.MarkOffset(pos.Topic, pos.Partition, pos.Offset+1, "") } - return nil + // If commit strategy is not CommitAuto, session should perform global, synchronous commit of current marked offsets. + if s.commitStrategy != CommitAuto { + s.session.Commit() + runtime.Gosched() // If any error from consumer side happens after Commiting, it will be read and s.lastErr will be set. + } + + return s.lastErr } // Close closes the Source. diff --git a/kafka/source_test.go b/kafka/source_test.go index c417e45..de9d45c 100644 --- a/kafka/source_test.go +++ b/kafka/source_test.go @@ -4,10 +4,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/Shopify/sarama" + "github.com/msales/streams/v6" "github.com/msales/streams/v6/kafka" - "github.com/stretchr/testify/assert" ) func TestNewSourceConfig(t *testing.T) { @@ -238,70 +241,67 @@ func TestNewSource_Error(t *testing.T) { assert.Error(t, err) } -// TODO -// Fix this test. It's failing with an error: -// "kafka: error decoding packet: correlation ID didn't match, wanted 8, got 9". -// It looks like the mock broker doesn't handle the protocol for the ConsumerGroup correctly, -// or some expectations are incorrect/missing. -// -// func TestSource_Consume(t *testing.T) { -// broker0 := sarama.NewMockBroker(t, 0) -// defer broker0.Close() -// broker0.SetHandlerByMap(map[string]sarama.MockResponse{ -// "MetadataRequest": sarama.NewMockMetadataResponse(t). -// SetBroker(broker0.Addr(), broker0.BrokerID()). -// SetLeader("test_topic", 0, broker0.BrokerID()), -// "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). -// SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), -// "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ -// Version: 1, -// Err: sarama.ErrNoError, -// GroupProtocol: "protocol", -// }), -// "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ -// Err: sarama.ErrNoError, -// MemberAssignment: []byte{ -// 0, 1, // Version -// 0, 0, 0, 1, // Topic array length -// 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one -// 0, 0, 0, 1, // Topic one, partition array length -// 0, 0, 0, 0, // 0 -// 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata -// }, -// }), -// "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). -// SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), -// "OffsetRequest": sarama.NewMockOffsetResponse(t). -// SetOffset("test_topic", 0, sarama.OffsetNewest, 10). -// SetOffset("test_topic", 0, sarama.OffsetOldest, 7), -// "FetchRequest": sarama.NewMockFetchResponse(t, 1). -// SetVersion(1). -// SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). -// SetHighWaterMark("test_topic", 0, 14), -// "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ -// Err: sarama.ErrNoError, -// }), -// }) -// c := kafka.NewSourceConfig() -// c.Brokers = []string{broker0.Addr()} -// c.Topic = "test_topic" -// c.GroupID = "test_group" -// c.Version = sarama.V2_3_0_0 -// -// s, err := kafka.NewSource(c) -// if err != nil { -// panic(err) -// } -// defer s.Close() -// -// time.Sleep(500 * time.Millisecond) -// -// msg, err := s.Consume() -// -// assert.NoError(t, err) -// assert.Equal(t, []byte(nil), msg.Key) -// assert.Equal(t, []byte("foo"), msg.Value) -// } +func TestSource_Consume(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrNoError, + MemberAssignment: []byte{ + 0, 1, // Version + 0, 0, 0, 1, // Topic array length + 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one + 0, 0, 0, 1, // Topic one, partition array length + 0, 0, 0, 0, // 0 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + }, + }), + "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t). + SetError(sarama.ErrNoError), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). + SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test_topic", 0, sarama.OffsetNewest, 10). + SetOffset("test_topic", 0, sarama.OffsetOldest, 7). + SetVersion(1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetVersion(11). + SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). + SetHighWaterMark("test_topic", 0, 14), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + c.Version = sarama.V2_3_0_0 + + s, err := kafka.NewSource(c) + if err != nil { + panic(err) + } + defer s.Close() + + time.Sleep(500 * time.Millisecond) + + msg, err := s.Consume() + + assert.NoError(t, err) + assert.Equal(t, []byte(nil), msg.Key) + assert.Equal(t, []byte("foo"), msg.Value) +} func TestSource_ConsumeError(t *testing.T) { broker0 := sarama.NewMockBroker(t, 0) @@ -344,205 +344,266 @@ func TestSource_ConsumeError(t *testing.T) { assert.Error(t, err) } -// TODO -// Fix this test. It's failing with an error: -// "kafka: error decoding packet: correlation ID didn't match, wanted 8, got 9". -// It looks like the mock broker doesn't handle the protocol for the ConsumerGroup correctly, -// or some expectations are incorrect/missing. -// -// func TestSource_Commit(t *testing.T) { -// broker0 := sarama.NewMockBroker(t, 0) -// defer broker0.Close() -// broker0.SetHandlerByMap(map[string]sarama.MockResponse{ -// "MetadataRequest": sarama.NewMockMetadataResponse(t). -// SetBroker(broker0.Addr(), broker0.BrokerID()). -// SetLeader("test_topic", 0, broker0.BrokerID()), -// "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). -// SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), -// "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ -// Version: 1, -// Err: sarama.ErrNoError, -// GroupProtocol: "protocol", -// }), -// "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ -// Err: sarama.ErrNoError, -// MemberAssignment: []byte{ -// 0, 1, // Version -// 0, 0, 0, 1, // Topic array length -// 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one -// 0, 0, 0, 1, // Topic one, partition array length -// 0, 0, 0, 0, // 0 -// 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata -// }, -// }), -// "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). -// SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), -// "OffsetRequest": sarama.NewMockOffsetResponse(t). -// SetOffset("test_topic", 0, sarama.OffsetNewest, 10). -// SetOffset("test_topic", 0, sarama.OffsetOldest, 7), -// "FetchRequest": sarama.NewMockFetchResponse(t, 1). -// SetVersion(1). -// SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). -// SetHighWaterMark("test_topic", 0, 14), -// "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t), -// "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ -// Err: sarama.ErrNoError, -// }), -// }) -// c := kafka.NewSourceConfig() -// c.Brokers = []string{broker0.Addr()} -// c.Topic = "test_topic" -// c.GroupID = "test_group" -// c.Version = sarama.V2_3_0_0 -// -// s, err := kafka.NewSource(c) -// if err != nil { -// panic(err) -// } -// defer s.Close() -// meta := kafka.Metadata{{Topic: "test_topic", Partition: 0, Offset: 10}} -// -// time.Sleep(100 * time.Millisecond) -// -// _, err = s.Consume() -// require.NoError(t, err) -// -// err = s.Commit(meta) -// -// assert.NoError(t, err) -// } - -// TODO -// Fix this test. It's failing with an error: -// "kafka: error decoding packet: correlation ID didn't match, wanted 8, got 9". -// It looks like the mock broker doesn't handle the protocol for the ConsumerGroup correctly, -// or some expectations are incorrect/missing. -// -// func TestSource_CommitNilMetadata(t *testing.T) { -// broker0 := sarama.NewMockBroker(t, 0) -// defer broker0.Close() -// broker0.SetHandlerByMap(map[string]sarama.MockResponse{ -// "MetadataRequest": sarama.NewMockMetadataResponse(t). -// SetBroker(broker0.Addr(), broker0.BrokerID()). -// SetLeader("test_topic", 0, broker0.BrokerID()), -// "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). -// SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), -// "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ -// Version: 1, -// Err: sarama.ErrNoError, -// GroupProtocol: "protocol", -// }), -// "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ -// Err: sarama.ErrNoError, -// MemberAssignment: []byte{ -// 0, 1, // Version -// 0, 0, 0, 1, // Topic array length -// 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one -// 0, 0, 0, 1, // Topic one, partition array length -// 0, 0, 0, 0, // 0 -// 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata -// }, -// }), -// "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). -// SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), -// "OffsetRequest": sarama.NewMockOffsetResponse(t). -// SetOffset("test_topic", 0, sarama.OffsetNewest, 10). -// SetOffset("test_topic", 0, sarama.OffsetOldest, 7), -// "FetchRequest": sarama.NewMockFetchResponse(t, 1). -// SetVersion(1). -// SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). -// SetHighWaterMark("test_topic", 0, 14), -// "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ -// Err: sarama.ErrNoError, -// }), -// }) -// c := kafka.NewSourceConfig() -// c.Brokers = []string{broker0.Addr()} -// c.Topic = "test_topic" -// c.GroupID = "test_group" -// c.Version = sarama.V2_3_0_0 -// -// s, err := kafka.NewSource(c) -// if err != nil { -// panic(err) -// } -// defer s.Close() -// -// time.Sleep(100 * time.Millisecond) -// -// _, err = s.Consume() -// require.NoError(t, err) -// -// err = s.Commit(nil) -// -// assert.NoError(t, err) -// } - -// TODO -// Fix this test. It's failing with an error: -// "kafka: error decoding packet: correlation ID didn't match, wanted 8, got 9". -// It looks like the mock broker doesn't handle the protocol for the ConsumerGroup correctly, -// or some expectations are incorrect/missing. -// -// func TestSource_CommitReturnError(t *testing.T) { -// broker0 := sarama.NewMockBroker(t, 0) -// defer broker0.Close() -// broker0.SetHandlerByMap(map[string]sarama.MockResponse{ -// "MetadataRequest": sarama.NewMockMetadataResponse(t). -// SetBroker(broker0.Addr(), broker0.BrokerID()). -// SetLeader("test_topic", 0, broker0.BrokerID()), -// "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). -// SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), -// "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ -// Version: 1, -// Err: sarama.ErrNoError, -// GroupProtocol: "protocol", -// }), -// "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ -// Err: sarama.ErrNoError, -// MemberAssignment: []byte{ -// 0, 1, // Version -// 0, 0, 0, 1, // Topic array length -// 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one -// 0, 0, 0, 1, // Topic one, partition array length -// 0, 0, 0, 0, // 0 -// 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata -// }, -// }), -// "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). -// SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), -// "OffsetRequest": sarama.NewMockOffsetResponse(t). -// SetOffset("test_topic", 0, sarama.OffsetNewest, 10). -// SetOffset("test_topic", 0, sarama.OffsetOldest, 7), -// "FetchRequest": sarama.NewMockFetchResponse(t, 1). -// SetVersion(1). -// SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). -// SetHighWaterMark("test_topic", 0, 14), -// "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t). -// SetError("test_group", "test_topic", 0, sarama.ErrBrokerNotAvailable), -// "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ -// Err: sarama.ErrNoError, -// }), -// }) -// c := kafka.NewSourceConfig() -// c.Brokers = []string{broker0.Addr()} -// c.Topic = "test_topic" -// c.GroupID = "test_group" -// c.Version = sarama.V2_3_0_0 -// -// s, err := kafka.NewSource(c) -// if err != nil { -// panic(err) -// } -// defer s.Close() -// meta := kafka.Metadata{{Topic: "test_topic", Partition: 0, Offset: 10}} -// -// time.Sleep(100 * time.Millisecond) -// -// _, err = s.Consume() -// require.NoError(t, err) -// -// err = s.Commit(meta) -// -// assert.Error(t, err) -// } +func TestSource_Commit_Auto(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrNoError, + MemberAssignment: []byte{ + 0, 2, // Version + 0, 0, 0, 1, // Topic array length + 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one + 0, 0, 0, 1, // Topic one, partition array length + 0, 0, 0, 0, // 0 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + }, + }), + "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t). + SetError(sarama.ErrNoError), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). + SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test_topic", 0, sarama.OffsetNewest, 10). + SetOffset("test_topic", 0, sarama.OffsetOldest, 7). + SetVersion(1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetVersion(11). + SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). + SetHighWaterMark("test_topic", 0, 14), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t). + SetError("test_group", "test_topic", 0, sarama.ErrNoError), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + c.Version = sarama.V2_3_0_0 + + s, err := kafka.NewSource(c) + if err != nil { + panic(err) + } + defer s.Close() + meta := kafka.Metadata{{Topic: "test_topic", Partition: 0, Offset: 10}} + + time.Sleep(100 * time.Millisecond) + + _, err = s.Consume() + require.NoError(t, err) + + err = s.Commit(meta) + + assert.NoError(t, err) +} + +func TestSource_Commit_Manual(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrNoError, + MemberAssignment: []byte{ + 0, 2, // Version + 0, 0, 0, 1, // Topic array length + 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one + 0, 0, 0, 1, // Topic one, partition array length + 0, 0, 0, 0, // 0 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + }, + }), + "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t). + SetError(sarama.ErrNoError), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). + SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test_topic", 0, sarama.OffsetNewest, 10). + SetOffset("test_topic", 0, sarama.OffsetOldest, 7). + SetVersion(1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetVersion(11). + SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). + SetHighWaterMark("test_topic", 0, 14), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t). + SetError("test_group", "test_topic", 0, sarama.ErrNoError), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + c.Version = sarama.V2_3_0_0 + c.CommitStrategy = kafka.CommitManual + + s, err := kafka.NewSource(c) + if err != nil { + panic(err) + } + defer s.Close() + meta := kafka.Metadata{{Topic: "test_topic", Partition: 0, Offset: 10}} + + time.Sleep(100 * time.Millisecond) + + _, err = s.Consume() + require.NoError(t, err) + + err = s.Commit(meta) + + assert.NoError(t, err) +} + +func TestSource_Commit_Manual_NilMetadata(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrNoError, + MemberAssignment: []byte{ + 0, 2, // Version + 0, 0, 0, 1, // Topic array length + 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one + 0, 0, 0, 1, // Topic one, partition array length + 0, 0, 0, 0, // 0 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + }, + }), + "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t). + SetError(sarama.ErrNoError), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). + SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test_topic", 0, sarama.OffsetNewest, 10). + SetOffset("test_topic", 0, sarama.OffsetOldest, 7). + SetVersion(1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetVersion(11). + SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). + SetHighWaterMark("test_topic", 0, 14), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + c.Version = sarama.V2_3_0_0 + c.CommitStrategy = kafka.CommitManual + + s, err := kafka.NewSource(c) + if err != nil { + panic(err) + } + defer s.Close() + + time.Sleep(100 * time.Millisecond) + + _, err = s.Consume() + require.NoError(t, err) + + err = s.Commit(nil) + + assert.NoError(t, err) +} + +func TestSource_Commit_Manual_ReturnError(t *testing.T) { + broker0 := sarama.NewMockBroker(t, 0) + defer broker0.Close() + broker0.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("test_topic", 0, broker0.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0), + "JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{ + Version: 1, + Err: sarama.ErrNoError, + GroupProtocol: "protocol", + }), + "SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{ + Err: sarama.ErrNoError, + MemberAssignment: []byte{ + 0, 1, // Version + 0, 0, 0, 1, // Topic array length + 0, 10, 't', 'e', 's', 't', '_', 't', 'o', 'p', 'i', 'c', // Topic one + 0, 0, 0, 1, // Topic one, partition array length + 0, 0, 0, 0, // 0 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + }, + }), + "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t). + SetError(sarama.ErrNoError), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t). + SetOffset("test_group", "test_topic", 0, 10, "", sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test_topic", 0, sarama.OffsetNewest, 10). + SetOffset("test_topic", 0, sarama.OffsetOldest, 7). + SetVersion(1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetVersion(11). + SetMessage("test_topic", 0, 10, sarama.StringEncoder("foo")). + SetHighWaterMark("test_topic", 0, 14), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(t). + SetError("test_group", "test_topic", 0, sarama.ErrBrokerNotAvailable), + "LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{ + Err: sarama.ErrNoError, + }), + }) + c := kafka.NewSourceConfig() + c.Brokers = []string{broker0.Addr()} + c.Topic = "test_topic" + c.GroupID = "test_group" + c.Version = sarama.V2_3_0_0 + c.CommitStrategy = kafka.CommitManual + + s, err := kafka.NewSource(c) + if err != nil { + panic(err) + } + defer s.Close() + meta := kafka.Metadata{{Topic: "test_topic", Partition: 0, Offset: 10}} + + time.Sleep(100 * time.Millisecond) + + _, err = s.Consume() + require.NoError(t, err) + + err = s.Commit(meta) + + assert.Error(t, err) +}