Skip to content

Commit

Permalink
Group topics (#2)
Browse files Browse the repository at this point in the history
* Consumer group topic assignments

* Fixed offset lag caclulation logic
  • Loading branch information
xitonix authored Oct 3, 2019
1 parent 72f1d13 commit 6eb8c3f
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 36 deletions.
17 changes: 14 additions & 3 deletions commands/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"syscall"

"github.com/dustin/go-humanize"
"github.com/gookit/color"
"gopkg.in/alecthomas/kingpin.v2"

Expand All @@ -26,7 +28,8 @@ var (
)

func initKafkaManager(globalParams *GlobalParameters, kafkaParams *kafkaParameters) (*kafka.Manager, context.Context, context.CancelFunc, error) {
manager, err := kafka.NewManager(kafkaParams.brokers,
brokers := getBrokers(kafkaParams.brokers)
manager, err := kafka.NewManager(brokers,
globalParams.Verbosity,
kafka.WithClusterVersion(kafkaParams.version),
kafka.WithTLS(kafkaParams.tls),
Expand All @@ -53,9 +56,9 @@ func initKafkaManager(globalParams *GlobalParameters, kafkaParams *kafkaParamete

func highlightLag(input int64) string {
if input > 0 {
return yellow(input)
return yellow(humanize.Comma(input))
}
return green(input)
return green(humanize.Comma(input))
}

func getNotFoundMessage(entity, filterName string, ex *regexp.Regexp) string {
Expand All @@ -72,3 +75,11 @@ func addFormatFlag(c *kingpin.CmdClause, format *string) {
Short('f').
EnumVar(format, plainTextFormat, tableFormat)
}

func getBrokers(commaSeparated string) []string {
brokers := strings.Split(commaSeparated, ",")
for i := 0; i < len(brokers); i++ {
brokers[i] = strings.TrimSpace(brokers[i])
}
return brokers
}
3 changes: 2 additions & 1 deletion commands/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (c *consume) run(_ *kingpin.ParseContext) error {
saramaLogWriter = logFile
}

brokers := getBrokers(c.kafkaParams.brokers)
consumer, err := kafka.NewConsumer(
c.kafkaParams.brokers, prn,
brokers, prn,
c.environment,
c.enableAutoTopicCreation,
kafka.WithClusterVersion(c.kafkaParams.version),
Expand Down
1 change: 1 addition & 0 deletions commands/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ func AddGroupCommand(app *kingpin.Application, global *GlobalParameters) {
kafkaParams := bindKafkaFlags(parent)
addListGroupsSubCommand(parent, global, kafkaParams)
addDeleteGroupSubCommand(parent, global, kafkaParams)
addListGroupTopicsSubCommand(parent, global, kafkaParams)
}
4 changes: 2 additions & 2 deletions commands/kafka_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type kafkaParameters struct {
brokers []string
brokers string
version string
tls *tls.Config
saslMechanism string
Expand All @@ -32,7 +32,7 @@ func bindKafkaFlags(cmd *kingpin.CmdClause) *kafkaParameters {
params := &kafkaParameters{}
cmd.Flag("brokers", "The comma separated list of Kafka brokers in server:port format.").
Short('b').
StringsVar(&params.brokers)
StringVar(&params.brokers)
cmd.Flag("kafka-version", "Kafka cluster version.").
Default(kafka.DefaultClusterVersion).
StringVar(&params.version)
Expand Down
128 changes: 128 additions & 0 deletions commands/list_group_topics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package commands

import (
"bytes"
"context"
"fmt"
"os"
"regexp"
"strconv"

"github.com/dustin/go-humanize"
"github.com/olekukonko/tablewriter"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/xitonix/trubka/kafka"
)

type listGroupTopics struct {
globalParams *GlobalParameters
kafkaParams *kafkaParameters
includeOffsets bool
topicFilter *regexp.Regexp
group string
format string
}

func addListGroupTopicsSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) {
cmd := &listGroupTopics{
globalParams: global,
kafkaParams: kafkaParams,
}
c := parent.Command("topics", "Lists the topics a consumer group is subscribed to.").Action(cmd.run)
c.Arg("group", "The consumer group ID to fetch the topics for.").
Required().
StringVar(&cmd.group)

c.Flag("topic-filter", "An optional regular expression to filter the topics by.").
Short('t').
RegexpVar(&cmd.topicFilter)

c.Flag("offsets", "Enables fetching the offsets for the topics.").
Short('o').
BoolVar(&cmd.includeOffsets)

addFormatFlag(c, &cmd.format)
}

func (c *listGroupTopics) run(_ *kingpin.ParseContext) error {
manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams)

if err != nil {
return err
}

defer func() {
manager.Close()
cancel()
}()

return c.listTopics(ctx, manager)
}

func (c *listGroupTopics) listTopics(ctx context.Context, manager *kafka.Manager) error {
topics, err := manager.GetGroupTopics(ctx, c.group, c.includeOffsets, c.topicFilter)
if err != nil {
return err
}

if len(topics) == 0 {
fmt.Println(getNotFoundMessage("topic", "topic", c.topicFilter))
return nil
}

switch c.format {
case plainTextFormat:
c.printPlainTextOutput(topics)
case tableFormat:
c.printTableOutput(topics)
}
return nil
}

func (c *listGroupTopics) printTableOutput(topics kafka.TopicPartitionOffset) {
for topic, partitionOffsets := range topics {
parentTable := tablewriter.NewWriter(os.Stdout)
parentTable.SetAutoWrapText(false)
parentTable.SetAutoFormatHeaders(false)
parentTable.SetHeader([]string{"Topic: " + topic})
parentTable.SetColMinWidth(0, 80)
if c.includeOffsets && len(partitionOffsets) > 0 {
buff := bytes.Buffer{}
table := tablewriter.NewWriter(&buff)
table.SetHeader([]string{"Partition", "Latest", "Current", "Lag"})
table.SetColMinWidth(0, 20)
table.SetColMinWidth(1, 20)
table.SetColMinWidth(2, 20)
table.SetColMinWidth(3, 20)
table.SetAlignment(tablewriter.ALIGN_CENTER)
partitions := partitionOffsets.SortPartitions()
for _, partition := range partitions {
offsets := partitionOffsets[int32(partition)]
latest := humanize.Comma(offsets.Latest)
current := humanize.Comma(offsets.Current)
part := strconv.FormatInt(int64(partition), 10)
table.Append([]string{part, latest, current, highlightLag(offsets.Lag())})
}
table.Render()
parentTable.Append([]string{buff.String()})
}
parentTable.SetHeaderLine(false)
parentTable.Render()
}
}

func (c *listGroupTopics) printPlainTextOutput(topics kafka.TopicPartitionOffset) {
for topic, partitionOffsets := range topics {
fmt.Printf("%s\n", bold(topic))
if c.includeOffsets && len(partitionOffsets) > 0 {
fmt.Printf("\n\n")
partitions := partitionOffsets.SortPartitions()
for _, partition := range partitions {
offsets := partitionOffsets[int32(partition)]
fmt.Printf(" Partition %2d: %d out of %d (Lag: %s) \n", partition, offsets.Current, offsets.Latest, highlightLag(offsets.Lag()))
}
fmt.Println()
}
}
}
51 changes: 27 additions & 24 deletions commands/list_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"os"
"regexp"
"strconv"
"strings"

"github.com/dustin/go-humanize"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
Expand All @@ -21,7 +23,7 @@ type listGroups struct {
includeMembers bool
memberFilter *regexp.Regexp
groupFilter *regexp.Regexp
topics []string
topics string
format string
}

Expand All @@ -34,9 +36,9 @@ func addListGroupsSubCommand(parent *kingpin.CmdClause, global *GlobalParameters
c.Flag("members", "Enables fetching consumer group members.").
Short('m').
BoolVar(&cmd.includeMembers)
c.Flag("topics", "The list of topics to retrieve the latest and the group offsets for.").
c.Flag("topics", "Comma separate list of the topics to retrieve the latest and the group offsets for.").
Short('t').
StringsVar(&cmd.topics)
StringVar(&cmd.topics)
c.Flag("member-filter", "An optional regular expression to filter the member ID/Client/Host by.").
Short('r').
RegexpVar(&cmd.memberFilter)
Expand All @@ -62,7 +64,8 @@ func (c *listGroups) run(_ *kingpin.ParseContext) error {
}

func (c *listGroups) listGroups(ctx context.Context, manager *kafka.Manager) error {
groups, err := manager.GetConsumerGroups(ctx, c.includeMembers, c.memberFilter, c.groupFilter, c.topics)
topics := strings.Split(c.topics, ",")
groups, err := manager.GetConsumerGroups(ctx, c.includeMembers, c.memberFilter, c.groupFilter, topics)
if err != nil {
return errors.Wrap(err, "Failed to list the brokers.")
}
Expand Down Expand Up @@ -92,37 +95,36 @@ func (*listGroups) printTableOutput(groups kafka.ConsumerGroups) {
buff := bytes.Buffer{}
buff.WriteString(fmt.Sprintf("\nMembers:\n"))
table := tablewriter.NewWriter(&buff)
table.SetHeader([]string{"Name", "Client ID", "Host"})
for _, member := range group.Members {
table.Append([]string{member.ID, member.ClientID, member.Host})
table.SetHeader([]string{"", "ID", "Host"})
for i, member := range group.Members {
table.Append([]string{strconv.Itoa(i + 1), member.ID, member.Host})
}
table.Render()
groupTable.Append([]string{buff.String()})
}

if len(group.TopicOffsets) > 0 {
buff := bytes.Buffer{}
buff.WriteString(fmt.Sprintf("\nGroup Offsets:\n"))
table := tablewriter.NewWriter(&buff)
table.SetHeader([]string{"Partition", "Latest", "Current", "Lag"})
table.SetColMinWidth(0, 20)
table.SetColMinWidth(1, 20)
table.SetColMinWidth(2, 20)
table.SetColMinWidth(3, 20)
table.SetAlignment(tablewriter.ALIGN_CENTER)

for _, partitionOffsets := range group.TopicOffsets {
for topic, partitionOffsets := range group.TopicOffsets {
buff := bytes.Buffer{}
buff.WriteString(fmt.Sprintf("\nTopic: %s\n", topic))
table := tablewriter.NewWriter(&buff)
table.SetHeader([]string{"Partition", "Latest", "Current", "Lag"})
table.SetColMinWidth(0, 20)
table.SetColMinWidth(1, 20)
table.SetColMinWidth(2, 20)
table.SetColMinWidth(3, 20)
table.SetAlignment(tablewriter.ALIGN_CENTER)
partitions := partitionOffsets.SortPartitions()
for _, partition := range partitions {
offsets := partitionOffsets[int32(partition)]
latest := strconv.FormatInt(offsets.Latest, 10)
current := strconv.FormatInt(offsets.Current, 10)
latest := humanize.Comma(offsets.Latest)
current := humanize.Comma(offsets.Current)
part := strconv.FormatInt(int64(partition), 10)
table.Append([]string{part, latest, current, highlightLag(offsets.Lag())})
}
table.Render()
groupTable.Append([]string{buff.String()})
}
table.Render()
groupTable.Append([]string{buff.String()})
}
groupTable.SetHeaderLine(false)
groupTable.Render()
Expand All @@ -142,11 +144,12 @@ func (*listGroups) printPlainTextOutput(groups kafka.ConsumerGroups) {

if len(group.TopicOffsets) > 0 {
fmt.Printf("\n%s\n\n", green("Partition Offsets:"))
for _, partitionOffsets := range group.TopicOffsets {
for topic, partitionOffsets := range group.TopicOffsets {
partitions := partitionOffsets.SortPartitions()
fmt.Printf("\n %s\n\n", green("Topic: "+topic))
for _, partition := range partitions {
offsets := partitionOffsets[int32(partition)]
fmt.Printf(" Partition %2d: %d out of %d (Lag: %s) \n", partition, offsets.Current, offsets.Latest, highlightLag(offsets.Lag()))
fmt.Printf(" Partition %2d: %d out of %d (Lag: %s) \n", partition, offsets.Current, offsets.Latest, highlightLag(offsets.Lag()))
}
}
fmt.Println()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Shopify/sarama v1.23.0
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type GroupMember struct {
}

func (g GroupMember) String() string {
return fmt.Sprintf("%s/%s(%s)", g.ID, g.ClientID, g.Host)
return fmt.Sprintf("%s [%s]", g.ID, g.Host)
}

// ConsumerGroup represents a consumer group.
Expand Down
Loading

0 comments on commit 6eb8c3f

Please sign in to comment.