From c6d0bbde72f74bb048b08c553f04c453901c3f2f Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Mon, 5 Nov 2018 17:34:59 +0100 Subject: [PATCH 1/3] update default Kafka error retries from 2 to 5 --- streamconfig/kafkaconfig/producer.go | 8 +++++--- streamconfig/kafkaconfig/producer_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/streamconfig/kafkaconfig/producer.go b/streamconfig/kafkaconfig/producer.go index e647bec..8c00311 100644 --- a/streamconfig/kafkaconfig/producer.go +++ b/streamconfig/kafkaconfig/producer.go @@ -54,8 +54,7 @@ type Producer struct { IgnoreErrors []kafka.ErrorCode // MaxDeliveryRetries dictates how many times to retry sending a failing - // MessageSet. Note: retrying may cause reordering. Defaults to 2 retries. Use - // `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery. + // MessageSet. Defaults to 5 retries. MaxDeliveryRetries int `kafka:"message.send.max.retries" split_words:"true"` // MaxInFlightRequests dictates the maximum number of in-flight requests per @@ -63,6 +62,9 @@ type Producer struct { // communication, however it is primarily relevant to produce requests. In // particular, note that other mechanisms limit the number of outstanding // consumer fetch request per broker to one. + // + // Note: having more than one in flight request may cause reordering. Use + // `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery. MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll // MaxQueueBufferDuration is the delay to wait for messages in the producer @@ -164,7 +166,7 @@ var ProducerDefaults = Producer{ kafka.ErrNotEnoughReplicasAfterAppend, kafka.ErrUnknownMemberID, }, - MaxDeliveryRetries: 2, + MaxDeliveryRetries: 5, MaxInFlightRequests: 1000000, MaxQueueBufferDuration: 10 * time.Millisecond, MaxQueueSizeKBytes: 2097151, diff --git a/streamconfig/kafkaconfig/producer_test.go b/streamconfig/kafkaconfig/producer_test.go index de9ad17..c8ab944 100644 --- a/streamconfig/kafkaconfig/producer_test.go +++ b/streamconfig/kafkaconfig/producer_test.go @@ -74,7 +74,7 @@ func TestProducerDefaults(t *testing.T) { assert.Equal(t, kafkaconfig.CompressionSnappy, config.CompressionCodec) assert.Equal(t, 1*time.Second, config.HeartbeatInterval) assert.Equal(t, errs, config.IgnoreErrors) - assert.Equal(t, 2, config.MaxDeliveryRetries) + assert.Equal(t, 5, config.MaxDeliveryRetries) assert.Equal(t, 1000000, config.MaxInFlightRequests) assert.Equal(t, 10*time.Millisecond, config.MaxQueueBufferDuration) assert.Equal(t, 2097151, config.MaxQueueSizeKBytes) From 40f3f0e9beedaadd50cf88d5604f28a499847ea9 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Mon, 5 Nov 2018 17:38:24 +0100 Subject: [PATCH 2/3] default Kafka producer to wait for acks from all brokers This improves message delivery guarantees. The producer already has `batch.num.messages` set to 10.000, so it won't wait for acks on each message. If more performance is needed, at the cost of reliability, this can be tweaked using `KafkaRequireLeaderAck`. --- streamconfig/kafkaconfig/producer.go | 2 +- streamconfig/kafkaconfig/producer_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streamconfig/kafkaconfig/producer.go b/streamconfig/kafkaconfig/producer.go index 8c00311..742d36a 100644 --- a/streamconfig/kafkaconfig/producer.go +++ b/streamconfig/kafkaconfig/producer.go @@ -171,7 +171,7 @@ var ProducerDefaults = Producer{ MaxQueueBufferDuration: 10 * time.Millisecond, MaxQueueSizeKBytes: 2097151, MaxQueueSizeMessages: 1000000, - RequiredAcks: AckLeader, + RequiredAcks: AckAll, SecurityProtocol: ProtocolPlaintext, SessionTimeout: 30 * time.Second, SSL: SSL{}, diff --git a/streamconfig/kafkaconfig/producer_test.go b/streamconfig/kafkaconfig/producer_test.go index c8ab944..85d9ccb 100644 --- a/streamconfig/kafkaconfig/producer_test.go +++ b/streamconfig/kafkaconfig/producer_test.go @@ -37,7 +37,7 @@ func TestProducer(t *testing.T) { MaxQueueBufferDuration: time.Duration(0), MaxQueueSizeKBytes: 0, MaxQueueSizeMessages: 0, - RequiredAcks: kafkaconfig.AckAll, + RequiredAcks: kafkaconfig.AckLeader, SecurityProtocol: kafkaconfig.ProtocolPlaintext, SessionTimeout: time.Duration(0), SSL: kafkaconfig.SSL{KeyPath: ""}, @@ -79,7 +79,7 @@ func TestProducerDefaults(t *testing.T) { assert.Equal(t, 10*time.Millisecond, config.MaxQueueBufferDuration) assert.Equal(t, 2097151, config.MaxQueueSizeKBytes) assert.Equal(t, 1000000, config.MaxQueueSizeMessages) - assert.EqualValues(t, kafkaconfig.AckLeader, config.RequiredAcks) + assert.EqualValues(t, kafkaconfig.AckAll, config.RequiredAcks) assert.Equal(t, kafkaconfig.ProtocolPlaintext, config.SecurityProtocol) assert.Equal(t, 30*time.Second, config.SessionTimeout) assert.Equal(t, kafkaconfig.SSL{}, config.SSL) From a1576575babaf04c12f7c79c6a7201a3492e28d3 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Mon, 5 Nov 2018 17:39:29 +0100 Subject: [PATCH 3/3] add option to Kafka producer to configure retry backoff Defaults to 15 seconds, to give the broker time to fix itself when an error is returned. --- streamconfig/kafkaconfig/producer.go | 4 ++++ streamconfig/kafkaconfig/producer_test.go | 8 ++++++++ streamconfig/option.go | 11 +++++++++++ streamconfig/option_test.go | 10 ++++++++++ 4 files changed, 33 insertions(+) diff --git a/streamconfig/kafkaconfig/producer.go b/streamconfig/kafkaconfig/producer.go index 742d36a..7b0a181 100644 --- a/streamconfig/kafkaconfig/producer.go +++ b/streamconfig/kafkaconfig/producer.go @@ -94,6 +94,9 @@ type Producer struct { // Defaults to `AckLeader`. RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"` + // RetryBackoff sets the backoff time before retrying a protocol request. + RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"` + // SecurityProtocol is the protocol used to communicate with brokers. SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"` @@ -172,6 +175,7 @@ var ProducerDefaults = Producer{ MaxQueueSizeKBytes: 2097151, MaxQueueSizeMessages: 1000000, RequiredAcks: AckAll, + RetryBackoff: 15 * time.Second, SecurityProtocol: ProtocolPlaintext, SessionTimeout: 30 * time.Second, SSL: SSL{}, diff --git a/streamconfig/kafkaconfig/producer_test.go b/streamconfig/kafkaconfig/producer_test.go index 85d9ccb..6eec574 100644 --- a/streamconfig/kafkaconfig/producer_test.go +++ b/streamconfig/kafkaconfig/producer_test.go @@ -19,6 +19,7 @@ var producerOmitempties = []string{ "{topic}.request.required.acks", "message.send.max.retries", "statistics.interval.ms", + "retry.backoff.ms", } func TestProducer(t *testing.T) { @@ -38,6 +39,7 @@ func TestProducer(t *testing.T) { MaxQueueSizeKBytes: 0, MaxQueueSizeMessages: 0, RequiredAcks: kafkaconfig.AckLeader, + RetryBackoff: 10 * time.Second, SecurityProtocol: kafkaconfig.ProtocolPlaintext, SessionTimeout: time.Duration(0), SSL: kafkaconfig.SSL{KeyPath: ""}, @@ -80,6 +82,7 @@ func TestProducerDefaults(t *testing.T) { assert.Equal(t, 2097151, config.MaxQueueSizeKBytes) assert.Equal(t, 1000000, config.MaxQueueSizeMessages) assert.EqualValues(t, kafkaconfig.AckAll, config.RequiredAcks) + assert.EqualValues(t, 15*time.Second, config.RetryBackoff) assert.Equal(t, kafkaconfig.ProtocolPlaintext, config.SecurityProtocol) assert.Equal(t, 30*time.Second, config.SessionTimeout) assert.Equal(t, kafkaconfig.SSL{}, config.SSL) @@ -233,6 +236,11 @@ func TestProducer_ConfigMap(t *testing.T) { &kafka.ConfigMap{"default.topic.config": kafka.ConfigMap{"request.required.acks": -1}}, }, + "RetryBackoff": { + &kafkaconfig.Producer{RetryBackoff: 1 * time.Second}, + &kafka.ConfigMap{"retry.backoff.ms": 1000}, + }, + "securityProtocol (plaintext)": { &kafkaconfig.Producer{SecurityProtocol: kafkaconfig.ProtocolPlaintext}, &kafka.ConfigMap{"security.protocol": "plaintext"}, diff --git a/streamconfig/option.go b/streamconfig/option.go index 85dee38..7e474e6 100644 --- a/streamconfig/option.go +++ b/streamconfig/option.go @@ -324,6 +324,17 @@ func KafkaRequireAllAck() Option { }) } +// KafkaRetryBackoff configures the producer to use the configured retry +// backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries` +// to configure the amount of retries to execute before returning an error. +// +// This option has no effect when applied to a consumer. +func KafkaRetryBackoff(d time.Duration) Option { + return optionFunc(func(_ *Consumer, p *Producer) { + p.Kafka.RetryBackoff = d + }) +} + // KafkaSecurityProtocol configures the producer or consumer to use the // specified security protocol. func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option { diff --git a/streamconfig/option_test.go b/streamconfig/option_test.go index 7af1752..07b2cdc 100644 --- a/streamconfig/option_test.go +++ b/streamconfig/option_test.go @@ -330,6 +330,16 @@ func TestOptions(t *testing.T) { }, }, + "KafkaRetryBackoff": { + []streamconfig.Option{streamconfig.KafkaRetryBackoff(1 * time.Minute)}, + streamconfig.Consumer{ + Kafka: kafkaconfig.Consumer{}, + }, + streamconfig.Producer{ + Kafka: kafkaconfig.Producer{RetryBackoff: 1 * time.Minute}, + }, + }, + "KafkaSecurityProtocol": { []streamconfig.Option{streamconfig.KafkaSecurityProtocol(kafkaconfig.ProtocolSSL)}, streamconfig.Consumer{