Skip to content

Commit

Permalink
Multi topic consuming (#5)
Browse files Browse the repository at this point in the history
* Prepend topic and partition key to the output if requested by the user

* Multi topic consuming in interactive mode

* make the proto contract name optional

* multi topic consumer stats

* colourized topic, partition key and timestamp in standard output
  • Loading branch information
xitonix authored Jan 19, 2020
1 parent 4104746 commit 2acb10c
Show file tree
Hide file tree
Showing 18 changed files with 516 additions and 191 deletions.
8 changes: 8 additions & 0 deletions commands/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -146,3 +147,10 @@ func getOutputWriters(outputDir string, topics map[string]*kafka.PartitionCheckp

return result, true, nil
}

func filterError(err error) error {
if errors.Is(err, errExitInteractiveMode) {
return nil
}
return err
}
25 changes: 16 additions & 9 deletions commands/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import (
"context"
"io"
"io/ioutil"
"os"
"os/signal"
"regexp"
"syscall"
"time"

"gopkg.in/alecthomas/kingpin.v2"
Expand All @@ -26,15 +23,23 @@ func AddConsumeCommand(app *kingpin.Application, global *GlobalParameters) {

func bindCommonConsumeFlags(command *kingpin.CmdClause,
topic, format, environment, outputDir, logFile, from *string,
includeTimestamp, enableAutoTopicCreation, reverse, interactive, count *bool,
includeTimestamp, includeKey, includeTopicName, enableAutoTopicCreation, reverse, interactive, interactiveWithCustomOffset, count *bool,
searchQuery, topicFilter **regexp.Regexp) {

command.Arg("topic", "The Kafka topic to consume from.").StringVar(topic)

command.Flag("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka.").
Short('T').
Short('S').
BoolVar(includeTimestamp)

command.Flag("include-partition-key", "Prints the partition key before the content.").
Short('K').
BoolVar(includeKey)

command.Flag("include-topic-name", "Prints the topic name before the content.").
Short('T').
BoolVar(includeTopicName)

command.Flag("auto-topic-creation", `Enables automatic topic creation before consuming if it's allowed by the server.`).
BoolVar(enableAutoTopicCreation)

Expand Down Expand Up @@ -68,10 +73,14 @@ func bindCommonConsumeFlags(command *kingpin.CmdClause,
Short('l').
StringVar(logFile)

command.Flag("interactive", "Runs the consumer in interactive mode.").
command.Flag("interactive", "Runs the consumer in interactive mode. Use --interactive-with-offset to set the starting offset for each topic.").
Short('i').
BoolVar(interactive)

command.Flag("interactive-with-offset", "Runs the consumer in interactive mode. In this mode, you will be able to define the starting offset for each topic.").
Short('I').
BoolVar(interactiveWithCustomOffset)

command.Flag("topic-filter", "The optional regular expression to filter the remote topics by (Interactive mode only).").
Short('t').
RegexpVar(topicFilter)
Expand All @@ -89,9 +98,7 @@ func bindCommonConsumeFlags(command *kingpin.CmdClause,
}

func monitorCancellation(prn *internal.SyncPrinter, cancel context.CancelFunc) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM)
<-signals
internal.WaitForCancellationSignal()
prn.Info(internal.Verbose, "Stopping Trubka.")
cancel()
}
Expand Down
51 changes: 30 additions & 21 deletions commands/consume_plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ type consumePlain struct {
logFile string
searchQuery *regexp.Regexp
interactive bool
interactiveWithOffset bool
topicFilter *regexp.Regexp
reverse bool
includeTimestamp bool
includeKey bool
includeTopicName bool
enableAutoTopicCreation bool
from string
count bool
Expand All @@ -48,16 +51,21 @@ func addConsumePlainCommand(parent *kingpin.CmdClause, global *GlobalParameters,
&cmd.logFile,
&cmd.from,
&cmd.includeTimestamp,
&cmd.includeKey,
&cmd.includeTopicName,
&cmd.enableAutoTopicCreation,
&cmd.reverse,
&cmd.interactive,
&cmd.interactiveWithOffset,
&cmd.count,
&cmd.searchQuery,
&cmd.topicFilter)
}

func (c *consumePlain) run(_ *kingpin.ParseContext) error {
if !c.interactive && internal.IsEmpty(c.topic) {

interactive := c.interactive || c.interactiveWithOffset
if !interactive && internal.IsEmpty(c.topic) {
return errors.New("which Kafka topic you would like to consume from? Make sure you provide the topic as the first argument or switch to interactive mode (-i)")
}

Expand All @@ -81,24 +89,20 @@ func (c *consumePlain) run(_ *kingpin.ParseContext) error {

go monitorCancellation(prn, cancel)

if c.interactive {
c.topic, err = askUserForTopic(consumer, c.topicFilter)
if err != nil {
return err
}
}

if internal.IsEmpty(c.topic) {
return nil
}

checkpoints, err := kafka.NewPartitionCheckpoints(c.from)
defaultCheckpoint, err := kafka.NewPartitionCheckpoints(c.from)
if err != nil {
return err
}

topics := map[string]*kafka.PartitionCheckpoints{
c.topic: checkpoints,
topics := make(map[string]*kafka.PartitionCheckpoints)

if interactive {
topics, err = askUserForTopics(consumer, c.topicFilter, c.interactiveWithOffset, defaultCheckpoint)
if err != nil {
return filterError(err)
}
} else {
topics[c.topic] = defaultCheckpoint
}

writers, writeEventsToFile, err := getOutputWriters(c.outputDir, topics)
Expand All @@ -113,11 +117,16 @@ func (c *consumePlain) run(_ *kingpin.ParseContext) error {
wg.Add(1)
consumerCtx, stopConsumer := context.WithCancel(context.Background())
defer stopConsumer()
counter := &internal.Counter{}
counter := internal.NewCounter()

go func() {
defer wg.Done()
marshaller := internal.NewPlainTextMarshaller(c.format, c.includeTimestamp)

marshaller := internal.NewPlainTextMarshaller(c.format,
c.includeTimestamp,
c.includeTopicName && !writeEventsToFile,
c.includeKey,
c.globalParams.EnableColor && !writeEventsToFile)

var cancelled bool
for {
Expand All @@ -141,12 +150,12 @@ func (c *consumePlain) run(_ *kingpin.ParseContext) error {
prn.WriteEvent(event.Topic, output)
consumer.StoreOffset(event)
if c.count {
counter.IncrSuccess()
counter.IncrSuccess(event.Topic)
}
continue
}
if c.count {
counter.IncrFailure()
counter.IncrFailure(event.Topic)
}
prn.Errorf(internal.Forced,
"Failed to process the message at offset %d of partition %d, topic %s: %s",
Expand Down Expand Up @@ -185,14 +194,14 @@ func (c *consumePlain) run(_ *kingpin.ParseContext) error {
prn.Close()

if c.count {
counter.Print(c.globalParams.EnableColor)
counter.PrintAsTable(c.globalParams.EnableColor)
}

return nil
}

func (c *consumePlain) process(event *kafka.Event, marshaller *internal.Marshaller, highlight bool) ([]byte, error) {
output, err := marshaller.Marshal(event.Value, event.Timestamp)
output, err := marshaller.Marshal(event.Value, event.Key, event.Timestamp, event.Topic, event.Partition)
if err != nil {
return nil, fmt.Errorf("invalid '%s' message received from Kafka: %w", c.format, err)
}
Expand Down
52 changes: 36 additions & 16 deletions commands/consume_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ type consumeProto struct {
protoFilter *regexp.Regexp
searchQuery *regexp.Regexp
interactive bool
interactiveWithOffset bool
reverse bool
includeTimestamp bool
includeKey bool
includeTopicName bool
enableAutoTopicCreation bool
count bool
from string
Expand All @@ -52,36 +55,41 @@ func addConsumeProtoCommand(parent *kingpin.CmdClause, global *GlobalParameters,
&cmd.logFile,
&cmd.from,
&cmd.includeTimestamp,
&cmd.includeKey,
&cmd.includeTopicName,
&cmd.enableAutoTopicCreation,
&cmd.reverse,
&cmd.interactive,
&cmd.interactiveWithOffset,
&cmd.count,
&cmd.searchQuery,
&cmd.topicFilter)
cmd.bindCommandFlags(c)
}

func (c *consumeProto) bindCommandFlags(command *kingpin.CmdClause) {

command.Arg("proto", "The fully qualified name of the protocol buffers type, stored in the given topic.").
command.Arg("contract", "The fully qualified name of the protocol buffers type, stored in the given topic. The default value is the same as the topic name.").
StringVar(&c.messageType)
command.Flag("proto-root", "The path to the folder where your *.proto files live.").
Short('r').
Required().
ExistingDirVar(&c.protoRoot)
StringVar(&c.protoRoot)

command.Flag("proto-filter", "The optional regular expression to filter the proto types by (Interactive mode only).").
Short('p').
RegexpVar(&c.protoFilter)
}

func (c *consumeProto) run(_ *kingpin.ParseContext) error {
if !c.interactive {
interactive := c.interactive || c.interactiveWithOffset
var implicitContract bool
if !interactive {
if internal.IsEmpty(c.topic) {
return errors.New("which Kafka topic you would like to consume from? Make sure you provide the topic as the first argument or switch to interactive mode (-i)")
}
if internal.IsEmpty(c.messageType) {
return fmt.Errorf("which message type is stored in %s topic? Make sure you provide the fully qualified proto name as the second argument or switch to interactive mode (-i)", c.topic)
c.messageType = c.topic
implicitContract = true
}
}

Expand Down Expand Up @@ -117,10 +125,10 @@ func (c *consumeProto) run(_ *kingpin.ParseContext) error {
return err
}

if c.interactive {
topics, tm, err = readUserData(consumer, loader, c.topicFilter, c.protoFilter, checkpoints)
if interactive {
topics, tm, err = readUserData(consumer, loader, c.topicFilter, c.protoFilter, c.interactiveWithOffset, checkpoints)
if err != nil {
return err
return filterError(err)
}
} else {
tm[c.topic] = c.messageType
Expand All @@ -130,7 +138,14 @@ func (c *consumeProto) run(_ *kingpin.ParseContext) error {
for _, messageType := range tm {
err := loader.Load(messageType)
if err != nil {
return err
if implicitContract && !interactive {
msg := "Most likely the message type is not exactly the same as the topic name."
msg += "You may need to explicitly specify the fully qualified type name as the second argument to the consume command"
msg += fmt.Sprintf("\nExample: trubka consume proto <flags...> %s <fully qualified type name>", c.topic)
return fmt.Errorf("%w. %s", err, msg)
} else {
return err
}
}
}

Expand All @@ -143,15 +158,21 @@ func (c *consumeProto) run(_ *kingpin.ParseContext) error {

wg := sync.WaitGroup{}

counter := &internal.Counter{}
counter := internal.NewCounter()

if len(tm) > 0 {
wg.Add(1)
consumerCtx, stopConsumer := context.WithCancel(context.Background())
defer stopConsumer()
go func() {
defer wg.Done()
marshaller := protobuf.NewMarshaller(c.format, c.includeTimestamp)

marshaller := protobuf.NewMarshaller(c.format,
c.includeTimestamp,
c.includeTopicName && !writeEventsToFile,
c.includeKey,
c.globalParams.EnableColor && !writeEventsToFile)

var cancelled bool
for {
select {
Expand All @@ -175,13 +196,13 @@ func (c *consumeProto) run(_ *kingpin.ParseContext) error {
prn.WriteEvent(event.Topic, output)
consumer.StoreOffset(event)
if c.count {
counter.IncrSuccess()
counter.IncrSuccess(event.Topic)
}
continue
}

if c.count {
counter.IncrFailure()
counter.IncrFailure(event.Topic)
}
prn.Errorf(internal.Forced,
"Failed to process the message at offset %d of partition %d, topic %s: %s",
Expand Down Expand Up @@ -224,9 +245,8 @@ func (c *consumeProto) run(_ *kingpin.ParseContext) error {
prn.Close()

if c.count {
counter.Print(c.globalParams.EnableColor)
counter.PrintAsTable(c.globalParams.EnableColor)
}

return nil
}

Expand All @@ -246,7 +266,7 @@ func (c *consumeProto) process(messageType string,
return nil, err
}

output, err := marshaller.Marshal(msg, event.Timestamp)
output, err := marshaller.Marshal(msg, event.Key, event.Timestamp, event.Topic, event.Partition)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions commands/delete_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ func (c *deleteGroup) run(_ *kingpin.ParseContext) error {

names := groups.Names()
sort.Strings(names)
index := pickAnIndex("Choose a consumer group ID to delete", "group", names)
if index < 0 {
return nil
indices, err := pickAnIndex("to delete", "consumer group", names, false)
if err != nil {
return filterError(err)
}
toRemove := names[index]
toRemove := names[indices[0]]
return c.delete(manager, toRemove)
}

func (c *deleteGroup) delete(manager *kafka.Manager, group string) error {
if internal.IsEmpty(group) {
return errors.New("Consumer group cannot be empty.")
return errors.New("consumer group cannot be empty")
}
if c.silent || askForConfirmation(fmt.Sprintf("Are you sure you want to delete %s", group)) {
err := manager.DeleteConsumerGroup(group)
Expand Down
8 changes: 5 additions & 3 deletions commands/delete_local_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ func (c *deleteLocalOffsets) run(_ *kingpin.ParseContext) error {

topics[len(files)] = "All"

index := pickAnIndex("Choose the topic to delete the offsets", "topic", topics)
if index < 0 {
return nil
indices, err := pickAnIndex("to delete the offsets", "topic", topics, false)
if err != nil {
return filterError(err)
}

index := indices[0]

removeAll := index == (len(topics) - 1)
var path, msg string
if removeAll {
Expand Down
Loading

0 comments on commit 2acb10c

Please sign in to comment.