Skip to content

Commit

Permalink
add option to Kafka producer to configure retry backoff
Browse files Browse the repository at this point in the history
Defaults to 15 seconds, to give the broker time to fix itself when an
error is returned.
  • Loading branch information
JeanMertz committed Dec 27, 2018
1 parent 40f3f0e commit a157657
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 0 deletions.
4 changes: 4 additions & 0 deletions streamconfig/kafkaconfig/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type Producer struct {
// Defaults to `AckLeader`.
RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"`

// RetryBackoff sets the backoff time before retrying a protocol request.
RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"`

// SecurityProtocol is the protocol used to communicate with brokers.
SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"`

Expand Down Expand Up @@ -172,6 +175,7 @@ var ProducerDefaults = Producer{
MaxQueueSizeKBytes: 2097151,
MaxQueueSizeMessages: 1000000,
RequiredAcks: AckAll,
RetryBackoff: 15 * time.Second,
SecurityProtocol: ProtocolPlaintext,
SessionTimeout: 30 * time.Second,
SSL: SSL{},
Expand Down
8 changes: 8 additions & 0 deletions streamconfig/kafkaconfig/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var producerOmitempties = []string{
"{topic}.request.required.acks",
"message.send.max.retries",
"statistics.interval.ms",
"retry.backoff.ms",
}

func TestProducer(t *testing.T) {
Expand All @@ -38,6 +39,7 @@ func TestProducer(t *testing.T) {
MaxQueueSizeKBytes: 0,
MaxQueueSizeMessages: 0,
RequiredAcks: kafkaconfig.AckLeader,
RetryBackoff: 10 * time.Second,
SecurityProtocol: kafkaconfig.ProtocolPlaintext,
SessionTimeout: time.Duration(0),
SSL: kafkaconfig.SSL{KeyPath: ""},
Expand Down Expand Up @@ -80,6 +82,7 @@ func TestProducerDefaults(t *testing.T) {
assert.Equal(t, 2097151, config.MaxQueueSizeKBytes)
assert.Equal(t, 1000000, config.MaxQueueSizeMessages)
assert.EqualValues(t, kafkaconfig.AckAll, config.RequiredAcks)
assert.EqualValues(t, 15*time.Second, config.RetryBackoff)
assert.Equal(t, kafkaconfig.ProtocolPlaintext, config.SecurityProtocol)
assert.Equal(t, 30*time.Second, config.SessionTimeout)
assert.Equal(t, kafkaconfig.SSL{}, config.SSL)
Expand Down Expand Up @@ -233,6 +236,11 @@ func TestProducer_ConfigMap(t *testing.T) {
&kafka.ConfigMap{"default.topic.config": kafka.ConfigMap{"request.required.acks": -1}},
},

"RetryBackoff": {
&kafkaconfig.Producer{RetryBackoff: 1 * time.Second},
&kafka.ConfigMap{"retry.backoff.ms": 1000},
},

"securityProtocol (plaintext)": {
&kafkaconfig.Producer{SecurityProtocol: kafkaconfig.ProtocolPlaintext},
&kafka.ConfigMap{"security.protocol": "plaintext"},
Expand Down
11 changes: 11 additions & 0 deletions streamconfig/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ func KafkaRequireAllAck() Option {
})
}

// KafkaRetryBackoff configures the producer to use the configured retry
// backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries`
// to configure the amount of retries to execute before returning an error.
//
// This option has no effect when applied to a consumer.
func KafkaRetryBackoff(d time.Duration) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RetryBackoff = d
})
}

// KafkaSecurityProtocol configures the producer or consumer to use the
// specified security protocol.
func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option {
Expand Down
10 changes: 10 additions & 0 deletions streamconfig/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ func TestOptions(t *testing.T) {
},
},

"KafkaRetryBackoff": {
[]streamconfig.Option{streamconfig.KafkaRetryBackoff(1 * time.Minute)},
streamconfig.Consumer{
Kafka: kafkaconfig.Consumer{},
},
streamconfig.Producer{
Kafka: kafkaconfig.Producer{RetryBackoff: 1 * time.Minute},
},
},

"KafkaSecurityProtocol": {
[]streamconfig.Option{streamconfig.KafkaSecurityProtocol(kafkaconfig.ProtocolSSL)},
streamconfig.Consumer{
Expand Down

0 comments on commit a157657

Please sign in to comment.