Skip to content

Commit

Permalink
feat: Log error messages and retry for ever
Browse files Browse the repository at this point in the history
  • Loading branch information
German Ramos Garcia committed Jun 10, 2016
1 parent 162ded9 commit a18ad68
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ kafka-console-producer implemented in golang and using [sarama](https://github.c
- Very easy to configure trough environment variables
- Auto discover kafka peers from DNS name
- Waits for kafka to be ready
- Auto reconnect
- Auto reconnect, and retry in case of error
- Log to stderr faulty messages

## Usage

Expand Down
21 changes: 18 additions & 3 deletions gopath/src/service/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func producer(kafkaService string,

//producer
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 5 // Retry up to 5 times to produce the message

config.Producer.RequiredAcks = sarama.WaitForLocal // The level of acknowledgement reliability needed from the broker
config.Producer.Timeout = 1000 * time.Millisecond // The maximum duration the broker will wait the receipt of the number of RequiredAcks
config.Producer.Return.Errors = true // If enabled, messages that failed to deliver will be returned on the Errors channel
config.Producer.Retry.Max = 5 // Retry up to 5 times to produce the message
config.Metadata.Retry.Max = 5000 // The total number of times to retry a metadata request when the cluster is in the middle of a leader election
config.Metadata.Retry.Backoff = 1 * time.Second // How long to wait for leader election to occur before retrying (default 250ms)
config.Metadata.RefreshFrequency = 30 * time.Second // How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
for producer == nil {
producer, err = sarama.NewAsyncProducer(brokerList, config)
if err != nil {
Expand All @@ -58,6 +62,17 @@ func producer(kafkaService string,
}

scanner := bufio.NewScanner(os.Stdin)

// log failed messages
go func() {
for {
var msg *sarama.ProducerError
msg = <-producer.Errors()
log.Printf("Error producing message: %s", msg.Msg.Value)
}
}()

// Read stdin for ever and publish messages
for scanner.Scan() {
msg := &sarama.ProducerMessage{
Topic: topic,
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.0
0.4.0

0 comments on commit a18ad68

Please sign in to comment.