From b0c5863f647d86f0f9699dca3385ea1f282aecff Mon Sep 17 00:00:00 2001 From: Angelos Valsamis Date: Fri, 6 Sep 2019 21:37:26 +0300 Subject: [PATCH] Introduce OptionFunc to expose Producer.RequiredAcks (#79) --- trace/kafka/option.go | 22 ++++++++++++++++++++++ trace/kafka/option_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/trace/kafka/option.go b/trace/kafka/option.go index f20297ec1..56b954ed3 100644 --- a/trace/kafka/option.go +++ b/trace/kafka/option.go @@ -8,6 +8,19 @@ import ( "github.com/beatlabs/patron/log" ) +// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements +// it must see before responding. +type RequiredAcks int16 + +const ( + // NoResponse doesn't send any response, the TCP ACK is all you get. + NoResponse RequiredAcks = 0 + // WaitForLocal waits for only the local commit to succeed before responding. + WaitForLocal RequiredAcks = 1 + // WaitForAll waits for all in-sync replicas to commit before responding. + WaitForAll RequiredAcks = -1 +) + // OptionFunc definition for configuring the async producer in a functional way. type OptionFunc func(*AsyncProducer) error @@ -38,3 +51,12 @@ func Timeouts(dial time.Duration) OptionFunc { return nil } } + +// RequiredAcksPolicy option for adjusting how many replica acknowledgements +// broker must see before responding. +func RequiredAcksPolicy(ack RequiredAcks) OptionFunc { + return func(ap *AsyncProducer) error { + ap.cfg.Producer.RequiredAcks = sarama.RequiredAcks(ack) + return nil + } +} diff --git a/trace/kafka/option_test.go b/trace/kafka/option_test.go index b2cdbf8e6..4b3db420d 100644 --- a/trace/kafka/option_test.go +++ b/trace/kafka/option_test.go @@ -64,3 +64,29 @@ func TestTimeouts(t *testing.T) { }) } } + +func TestRequiredAcksPolicy(t *testing.T) { + type args struct { + requiredAcks RequiredAcks + } + tests := []struct { + name string + args args + wantErr bool + }{ + {name: "success", args: args{requiredAcks: NoResponse}, wantErr: false}, + {name: "success", args: args{requiredAcks: WaitForAll}, wantErr: false}, + {name: "success", args: args{requiredAcks: WaitForLocal}, wantErr: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ap := AsyncProducer{cfg: sarama.NewConfig()} + err := RequiredAcksPolicy(tt.args.requiredAcks)(&ap) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}