Skip to content

Commit

Permalink
Issue #443 - Startup fail for missing topic or brokers not reachable (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodrigo Broggi authored Dec 21, 2021
1 parent 6e0b41a commit a750632
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
30 changes: 30 additions & 0 deletions component/kafka/group/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package group

import (
"errors"
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/log"
)
Expand All @@ -26,6 +28,34 @@ func FailureStrategy(fs kafka.FailStrategy) OptionFunc {
}
}

// CheckTopic checks whether the component-configured topics exist in the broker.
func CheckTopic() OptionFunc {
return func(c *Component) error {
saramaConf := sarama.NewConfig()
client, err := sarama.NewClient(c.brokers, saramaConf)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer func() { _ = client.Close() }()
brokerTopics, err := client.Topics()
if err != nil {
return fmt.Errorf("failed to get topics from broker: %w", err)
}

topicsSet := make(map[string]struct{}, len(brokerTopics))
for _, topic := range brokerTopics {
topicsSet[topic] = struct{}{}
}

for _, topic := range c.topics {
if _, ok := topicsSet[topic]; !ok {
return fmt.Errorf("topic %s does not exist in broker", topic)
}
}
return nil
}
}

// Retries sets the number of time a component should retry in case of an error.
// These retries are depleted in these cases:
// * when there are temporary connection issues
Expand Down
37 changes: 36 additions & 1 deletion test/docker/kafka/component_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,40 @@ func TestKafkaComponent_FailOnceAndRetry(t *testing.T) {
assert.Equal(t, expectedMessages, actualMessages)
}

func TestGroupConsume_CheckTopicFailsDueToNonExistingTopic(t *testing.T) {
// Test parameters
processorFunc := func(batch kafka.Batch) error {
return nil
}
invalidTopicName := "invalid-topic-name"
_, err := group.New(
invalidTopicName,
invalidTopicName+"-group",
[]string{fmt.Sprintf("%s:%s", kafkaHost, kafkaPort)},
[]string{invalidTopicName},
processorFunc,
sarama.NewConfig(),
group.CheckTopic())
require.EqualError(t, err, "topic invalid-topic-name does not exist in broker")
}

func TestGroupConsume_CheckTopicFailsDueToNonExistingBroker(t *testing.T) {
// Test parameters
processorFunc := func(batch kafka.Batch) error {
return nil
}
_, err := group.New(
successTopic3,
successTopic3+"-group",
[]string{fmt.Sprintf("%s:%s", kafkaHost, wrongKafkaPort)},
[]string{successTopic3},
processorFunc,
sarama.NewConfig(),
group.CheckTopic())
require.NotNil(t, err)
require.Contains(t, err.Error(), "failed to create client:")
}

func newComponent(t *testing.T, name string, retries uint, batchSize uint, processorFunc kafka.BatchProcessorFunc) *group.Component {
saramaCfg, err := kafka.DefaultConsumerSaramaConfig(name, true)
saramaCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
Expand All @@ -228,7 +262,8 @@ func newComponent(t *testing.T, name string, retries uint, batchSize uint, proce
group.BatchTimeout(100*time.Millisecond),
group.Retries(retries),
group.RetryWait(200*time.Millisecond),
group.CommitSync())
group.CommitSync(),
group.CheckTopic())
require.NoError(t, err)

return cmp
Expand Down
3 changes: 3 additions & 0 deletions test/docker/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
kafkaHost = "localhost"
kafkaPort = "9092"
wrongKafkaPort = "9032"
zookeeperPort = "2181"
simpleTopic1 = "simpleTopic1"
simpleTopic2 = "simpleTopic2"
Expand All @@ -32,6 +33,7 @@ const (
groupTopic2 = "groupTopic2"
successTopic1 = "successTopic1"
successTopic2 = "successTopic2"
successTopic3 = "successTopic3"
failAllRetriesTopic1 = "failAllRetriesTopic1"
failAllRetriesTopic2 = "failAllRetriesTopic2"
failAndRetryTopic1 = "failAndRetryTopic1"
Expand All @@ -53,6 +55,7 @@ func TestMain(m *testing.M) {
getTopic(failAndRetryTopic2),
getTopic(successTopic1),
getTopic(successTopic2),
getTopic(successTopic3),
}
k, err := create(120*time.Second, topics...)
if err != nil {
Expand Down

0 comments on commit a750632

Please sign in to comment.