Skip to content

Commit

Permalink
Commands (#1)
Browse files Browse the repository at this point in the history
Consume and administration commands
  • Loading branch information
xitonix authored Sep 12, 2019
1 parent 4091fa9 commit ca830f4
Show file tree
Hide file tree
Showing 42 changed files with 2,351 additions and 666 deletions.
38 changes: 38 additions & 0 deletions application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"os"

"gopkg.in/alecthomas/kingpin.v2"

"github.com/xitonix/trubka/commands"
"github.com/xitonix/trubka/internal"
)

func newApplication() error {
app := kingpin.New("trubka", "A tool to consume protocol buffer events from Kafka.").DefaultEnvars()

global := &commands.GlobalParameters{}
bindAppFlags(app, global)

commands.AddVersionCommand(app, version)
commands.AddConsumeCommand(app, global)
commands.AddBrokerCommand(app, global)
commands.AddTopicCommand(app, global)
commands.AddGroupCommand(app, global)
commands.AddLocalOffsetCommand(app, global)
_, err := app.Parse(os.Args[1:])
return err
}

func bindAppFlags(app *kingpin.Application, global *commands.GlobalParameters) {
var verbosity int
app.Flag("verbose", "The verbosity level of Trubka.").
Short('v').
NoEnvar().
PreAction(func(context *kingpin.ParseContext) error {
global.Verbosity = internal.ToVerbosityLevel(verbosity)
return nil
}).
CounterVar(&verbosity)
}
226 changes: 0 additions & 226 deletions bootstrap.go

This file was deleted.

12 changes: 12 additions & 0 deletions commands/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package commands

import (
"gopkg.in/alecthomas/kingpin.v2"
)

// AddBrokerCommand initialises the broker top level command and adds it to the application.
func AddBrokerCommand(app *kingpin.Application, global *GlobalParameters) {
parent := app.Command("broker", "A command to manage Kafka brokers.")
kafkaParams := bindKafkaFlags(parent)
addListBrokersSubCommand(parent, global, kafkaParams)
}
74 changes: 74 additions & 0 deletions commands/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package commands

import (
"context"
"fmt"
"os"
"os/signal"
"regexp"
"syscall"

"github.com/gookit/color"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/xitonix/trubka/kafka"
)

const (
plainTextFormat = "plain"
tableFormat = "table"
)

var (
yellow = color.Warn.Render
green = color.Info.Render
bold = color.Bold.Render
)

func initKafkaManager(globalParams *GlobalParameters, kafkaParams *kafkaParameters) (*kafka.Manager, context.Context, context.CancelFunc, error) {
manager, err := kafka.NewManager(kafkaParams.brokers,
globalParams.Verbosity,
kafka.WithClusterVersion(kafkaParams.version),
kafka.WithTLS(kafkaParams.tls),
kafka.WithClusterVersion(kafkaParams.version),
kafka.WithSASL(kafkaParams.saslMechanism,
kafkaParams.saslUsername,
kafkaParams.saslPassword))

if err != nil {
return nil, nil, nil, err
}

ctx, cancel := context.WithCancel(context.Background())

go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM)
<-signals
cancel()
}()

return manager, ctx, cancel, nil
}

func highlightLag(input int64) string {
if input > 0 {
return yellow(input)
}
return green(input)
}

func getNotFoundMessage(entity, filterName string, ex *regexp.Regexp) string {
msg := fmt.Sprintf("No %s has been found.", entity)
if ex != nil {
msg += fmt.Sprintf(" You might need to tweak the %s filter (%s).", filterName, ex.String())
}
return msg
}

func addFormatFlag(c *kingpin.CmdClause, format *string) {
c.Flag("format", "Sets the output format.").
Default(tableFormat).
Short('f').
EnumVar(format, plainTextFormat, tableFormat)
}
Loading

0 comments on commit ca830f4

Please sign in to comment.