Skip to content

Commit

Permalink
feat: Add KEY support
Browse files Browse the repository at this point in the history
  • Loading branch information
German Ramos Garcia committed Oct 3, 2016
1 parent ed5c43c commit 94e9930
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ gopath/src/golang.org/x/tools

#Final binaries
/service
/kafka-console-producer
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Use environment variables
- KAFKA_PORT, "9092", Port to connect to input Kafka peers
- TOPIC, "mytopic", The topic to consume
- VERBOSE, "false, Set to `true` if you want verbose output
- KEY, "", The key of produced messages. Default (empty) will produce in every topic
- FINISH_TIMEOUT", "1", Number of seconds to wait for exit after end of line is received

## Download

Expand Down
8 changes: 5 additions & 3 deletions gopath/src/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (

func main() {
log.SetOutput(os.Stderr)
log.Print("kafka-console-producer v0.5.1")
log.Print("kafka-console-producer v0.6.0")
var (
kafkaService = getConfig("KAFKA_SERVICE", "kafka") // The DNS name for input Kafka broker service
kafkaPort = getConfig("KAFKA_PORT", "9092") // Port to connect to input Kafka peers
topic = getConfig("TOPIC", "mytopic") // The topic to consume
topic = getConfig("TOPIC", "mytopic") // The topic to produce to
verbose = getConfig("VERBOSE", "false") // Set to `true` if you want to turn on sarama logging
finishTimeout = getConfig("FINISH_TIMEOUT", "1") // Number of seconds to wait for exit after end of line is received
key = getConfig("KEY", "") // The key of produced messages. Default (empty) will produce in every topic
)
messages := make(chan string)
saramaProducer := producer(kafkaService, kafkaPort, topic, messages, verbose == "true")

saramaProducer := producer(kafkaService, kafkaPort, topic, messages, key, verbose == "true")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
event := scanner.Text()
Expand Down
9 changes: 9 additions & 0 deletions gopath/src/service/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func producer(kafkaService string,
kafkaPort string,
topic string,
messages chan string,
key string,
verbose bool) sarama.AsyncProducer {

var (
Expand Down Expand Up @@ -71,11 +72,19 @@ func producer(kafkaService string,
}()

// Read stdin for ever and publish messages
var producerKey sarama.Encoder
if key == "" {
producerKey = nil
} else {
producerKey = sarama.StringEncoder(key)
}

go func() {
for {
msg := <-messages
producerMessage := &sarama.ProducerMessage{
Topic: topic,
Key: producerKey,
Value: sarama.StringEncoder(msg),
}
producer.Input() <- producerMessage
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.1
0.6.0

0 comments on commit 94e9930

Please sign in to comment.