Skip to content

Commit

Permalink
Merge pull request #106 from blendle/retry-config
Browse files Browse the repository at this point in the history
Tweak numbers related to retries, and message delivery guarantee
  • Loading branch information
JeanMertz authored Dec 27, 2018
2 parents b9e7e6a + a157657 commit 1e17c97
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
14 changes: 10 additions & 4 deletions streamconfig/kafkaconfig/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ type Producer struct {
IgnoreErrors []kafka.ErrorCode

// MaxDeliveryRetries dictates how many times to retry sending a failing
// MessageSet. Note: retrying may cause reordering. Defaults to 2 retries. Use
// `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery.
// MessageSet. Defaults to 5 retries.
MaxDeliveryRetries int `kafka:"message.send.max.retries" split_words:"true"`

// MaxInFlightRequests dictates the maximum number of in-flight requests per
// broker connection. This is a generic property applied to all broker
// communication, however it is primarily relevant to produce requests. In
// particular, note that other mechanisms limit the number of outstanding
// consumer fetch request per broker to one.
//
// Note: having more than one in flight request may cause reordering. Use
// `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery.
MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll

// MaxQueueBufferDuration is the delay to wait for messages in the producer
Expand Down Expand Up @@ -92,6 +94,9 @@ type Producer struct {
// Defaults to `AckLeader`.
RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"`

// RetryBackoff sets the backoff time before retrying a protocol request.
RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"`

// SecurityProtocol is the protocol used to communicate with brokers.
SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"`

Expand Down Expand Up @@ -164,12 +169,13 @@ var ProducerDefaults = Producer{
kafka.ErrNotEnoughReplicasAfterAppend,
kafka.ErrUnknownMemberID,
},
MaxDeliveryRetries: 2,
MaxDeliveryRetries: 5,
MaxInFlightRequests: 1000000,
MaxQueueBufferDuration: 10 * time.Millisecond,
MaxQueueSizeKBytes: 2097151,
MaxQueueSizeMessages: 1000000,
RequiredAcks: AckLeader,
RequiredAcks: AckAll,
RetryBackoff: 15 * time.Second,
SecurityProtocol: ProtocolPlaintext,
SessionTimeout: 30 * time.Second,
SSL: SSL{},
Expand Down
14 changes: 11 additions & 3 deletions streamconfig/kafkaconfig/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var producerOmitempties = []string{
"{topic}.request.required.acks",
"message.send.max.retries",
"statistics.interval.ms",
"retry.backoff.ms",
}

func TestProducer(t *testing.T) {
Expand All @@ -37,7 +38,8 @@ func TestProducer(t *testing.T) {
MaxQueueBufferDuration: time.Duration(0),
MaxQueueSizeKBytes: 0,
MaxQueueSizeMessages: 0,
RequiredAcks: kafkaconfig.AckAll,
RequiredAcks: kafkaconfig.AckLeader,
RetryBackoff: 10 * time.Second,
SecurityProtocol: kafkaconfig.ProtocolPlaintext,
SessionTimeout: time.Duration(0),
SSL: kafkaconfig.SSL{KeyPath: ""},
Expand Down Expand Up @@ -74,12 +76,13 @@ func TestProducerDefaults(t *testing.T) {
assert.Equal(t, kafkaconfig.CompressionSnappy, config.CompressionCodec)
assert.Equal(t, 1*time.Second, config.HeartbeatInterval)
assert.Equal(t, errs, config.IgnoreErrors)
assert.Equal(t, 2, config.MaxDeliveryRetries)
assert.Equal(t, 5, config.MaxDeliveryRetries)
assert.Equal(t, 1000000, config.MaxInFlightRequests)
assert.Equal(t, 10*time.Millisecond, config.MaxQueueBufferDuration)
assert.Equal(t, 2097151, config.MaxQueueSizeKBytes)
assert.Equal(t, 1000000, config.MaxQueueSizeMessages)
assert.EqualValues(t, kafkaconfig.AckLeader, config.RequiredAcks)
assert.EqualValues(t, kafkaconfig.AckAll, config.RequiredAcks)
assert.EqualValues(t, 15*time.Second, config.RetryBackoff)
assert.Equal(t, kafkaconfig.ProtocolPlaintext, config.SecurityProtocol)
assert.Equal(t, 30*time.Second, config.SessionTimeout)
assert.Equal(t, kafkaconfig.SSL{}, config.SSL)
Expand Down Expand Up @@ -233,6 +236,11 @@ func TestProducer_ConfigMap(t *testing.T) {
&kafka.ConfigMap{"default.topic.config": kafka.ConfigMap{"request.required.acks": -1}},
},

"RetryBackoff": {
&kafkaconfig.Producer{RetryBackoff: 1 * time.Second},
&kafka.ConfigMap{"retry.backoff.ms": 1000},
},

"securityProtocol (plaintext)": {
&kafkaconfig.Producer{SecurityProtocol: kafkaconfig.ProtocolPlaintext},
&kafka.ConfigMap{"security.protocol": "plaintext"},
Expand Down
11 changes: 11 additions & 0 deletions streamconfig/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ func KafkaRequireAllAck() Option {
})
}

// KafkaRetryBackoff configures the producer to use the configured retry
// backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries`
// to configure the amount of retries to execute before returning an error.
//
// This option has no effect when applied to a consumer.
func KafkaRetryBackoff(d time.Duration) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RetryBackoff = d
})
}

// KafkaSecurityProtocol configures the producer or consumer to use the
// specified security protocol.
func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option {
Expand Down
10 changes: 10 additions & 0 deletions streamconfig/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ func TestOptions(t *testing.T) {
},
},

"KafkaRetryBackoff": {
[]streamconfig.Option{streamconfig.KafkaRetryBackoff(1 * time.Minute)},
streamconfig.Consumer{
Kafka: kafkaconfig.Consumer{},
},
streamconfig.Producer{
Kafka: kafkaconfig.Producer{RetryBackoff: 1 * time.Minute},
},
},

"KafkaSecurityProtocol": {
[]streamconfig.Option{streamconfig.KafkaSecurityProtocol(kafkaconfig.ProtocolSSL)},
streamconfig.Consumer{
Expand Down

0 comments on commit 1e17c97

Please sign in to comment.