diff --git a/application.go b/application.go new file mode 100644 index 0000000..80d1306 --- /dev/null +++ b/application.go @@ -0,0 +1,38 @@ +package main + +import ( + "os" + + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/commands" + "github.com/xitonix/trubka/internal" +) + +func newApplication() error { + app := kingpin.New("trubka", "A tool to consume protocol buffer events from Kafka.").DefaultEnvars() + + global := &commands.GlobalParameters{} + bindAppFlags(app, global) + + commands.AddVersionCommand(app, version) + commands.AddConsumeCommand(app, global) + commands.AddBrokerCommand(app, global) + commands.AddTopicCommand(app, global) + commands.AddGroupCommand(app, global) + commands.AddLocalOffsetCommand(app, global) + _, err := app.Parse(os.Args[1:]) + return err +} + +func bindAppFlags(app *kingpin.Application, global *commands.GlobalParameters) { + var verbosity int + app.Flag("verbose", "The verbosity level of Trubka."). + Short('v'). + NoEnvar(). + PreAction(func(context *kingpin.ParseContext) error { + global.Verbosity = internal.ToVerbosityLevel(verbosity) + return nil + }). + CounterVar(&verbosity) +} diff --git a/bootstrap.go b/bootstrap.go deleted file mode 100644 index 9fdd47a..0000000 --- a/bootstrap.go +++ /dev/null @@ -1,226 +0,0 @@ -package main - -import ( - "crypto/tls" - "crypto/x509" - "io" - "io/ioutil" - "os" - "path/filepath" - - "github.com/gookit/color" - "github.com/pkg/errors" - "github.com/xitonix/flags" - "github.com/xitonix/flags/core" - - "github.com/xitonix/trubka/internal" - "github.com/xitonix/trubka/kafka" - "github.com/xitonix/trubka/protobuf" -) - -var ( - profilingMode *core.StringFlag - protoDir *core.StringFlag - logFilePath *core.StringFlag - outputDir *core.StringFlag - topic *core.StringFlag - messageType *core.StringFlag - format *core.StringFlag - kafkaVersion *core.StringFlag - environment *core.StringFlag - topicFilter *core.StringFlag - typeFilter *core.StringFlag - searchQuery *core.StringFlag - saslUsername *core.StringFlag - saslPassword *core.StringFlag - saslMechanism *core.StringFlag - tlsCACert *core.StringFlag - tlsClientKey *core.StringFlag - tlsClientCert *core.StringFlag - terminalMode *core.StringFlag - brokers *core.StringSliceFlag - protoFiles *core.StringSliceFlag - interactive *core.BoolFlag - reverse *core.BoolFlag - includeTimeStamp *core.BoolFlag - rewind *core.BoolFlag - enableTLS *core.BoolFlag - enableAutoTopicCreation *core.BoolFlag - versionRequest *core.BoolFlag - timeCheckpoint *core.TimeFlag - offsetCheckpoint *core.Int64Flag - verbosity *core.CounterFlag -) - -func initFlags() { - flags.EnableAutoKeyGeneration() - flags.SetKeyPrefix("TBK") - profilingMode = flags.String("profile", "Enables profiling.").WithValidRange(true, "cpu", "mem", "block", "mutex", "thread").Hide() - - brokers = flags.StringSlice("brokers", "The comma separated list of Kafka brokers in server:port format.").WithShort("b") - protoDir = flags.String("proto-root", "The path to the folder where your *.proto files live.").WithShort("R") - topic = flags.String("topic", `The Kafka topic to consume from.`).WithShort("t") - messageType = flags.String("proto", `The fully qualified name of the protobuf type, stored in the given topic.`).WithShort("p") - format = flags.String("format", "The format in which the Kafka messages will be written to the output."). - WithValidRange(true, protobuf.Json, protobuf.JsonIndent, protobuf.Text, protobuf.TextIndent, protobuf.Hex, protobuf.HexIndent). - WithDefault(protobuf.JsonIndent) - protoFiles = flags.StringSlice("proto-files", `An optional list of the proto files to load. If not specified all the files in --proto-root will be processed.`) - - interactive = flags.Bool("interactive", "Runs the tool in interactive mode.").WithShort("i") - - logFilePath = flags.String("log-file", "The `file` to write the logs to. Set to '' to discard (Default: stdout).").WithShort("l") - outputDir = flags.String("output-dir", "The `directory` to write the Kafka messages to. Set to '' to discard (Default: Stdout).").WithShort("d") - - kafkaVersion = flags.String("kafka-version", "Kafka cluster version.").WithDefault(kafka.DefaultClusterVersion) - rewind = flags.Bool("rewind", `Starts consuming from the beginning of the stream.`).WithShort("w") - timeCheckpoint = flags.Time("from", `Starts consuming from the most recent available offset at the given time. This will override --rewind.`).WithShort("f") - offsetCheckpoint = flags.Int64("from-offset", `Starts consuming from the specified offset (if applicable). This will override --rewind and --from. - If the most recent offset value of a partition is less than the specified value, this flag will be ignored.`).WithShort("o") - environment = flags.String("environment", `To store the offsets on the disk in environment specific paths. It's only required - if you use Trubka to consume from different Kafka clusters on the same machine (eg. dev/prod).`).WithShort("E").WithDefault("offsets") - topicFilter = flags.String("topic-filter", "The optional regular expression to filter the remote topics by (Interactive mode only).").WithShort("n") - typeFilter = flags.String("type-filter", "The optional regular expression to filter the proto types with (Interactive mode only).").WithShort("m") - reverse = flags.Bool("reverse", "If set, the messages which match the --search-query will be filtered out.") - searchQuery = flags.String("search-query", "The optional regular expression to filter the message content by.").WithShort("q") - includeTimeStamp = flags.Bool("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka.").WithShort("T") - enableAutoTopicCreation = flags.Bool("auto-topic-creation", `Enables automatic Kafka topic creation before consuming (if it is allowed on the server). - Enabling this option in production is not recommended since it may pollute the environment with unwanted topics.`) - saslMechanism = flags.String("sasl-mechanism", "SASL authentication mechanism."). - WithValidRange(true, kafka.SASLMechanismNone, kafka.SASLMechanismPlain, kafka.SASLMechanismSCRAM256, kafka.SASLMechanismSCRAM512). - WithDefault(kafka.SASLMechanismNone) - - saslUsername = flags.String("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none.").WithShort("U") - saslPassword = flags.String("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none.").WithShort("P") - - enableTLS = flags.Bool("tls", "Enables TLS (Unverified by default). Mutual authentication can also be enabled by providing client key and certificate.") - tlsCACert = flags.String("ca-cert", `Trusted root certificates for verifying the server. If not set, Trubka will skip server certificate and domain verification.`) - tlsClientCert = flags.String("client-cert", `Client certification file to enable mutual TLS authentication. Client key must also be provided.`) - tlsClientKey = flags.String("client-key", `Client private key file to enable mutual TLS authentication. Client certificate must also be provided.`) - terminalMode = flags.String("terminal-mode", `Sets the color mode of your terminal to adjust colors and highlights. Set to none to disable colors.`). - WithValidRange(true, internal.NoTheme, internal.DarkTheme, internal.LightTheme). - WithDefault(internal.DarkTheme) - verbosity = flags.Verbosity("The verbosity level of the tool.").WithKey("-") - versionRequest = flags.Bool("version", "Prints the current version of Trubka.").WithKey("-") - - flags.Parse() -} - -func getLogWriter(f *core.StringFlag) (io.Writer, bool, error) { - file := f.Get() - if internal.IsEmpty(file) { - if f.IsSet() { - return ioutil.Discard, false, nil - } - return os.Stdout, false, nil - } - lf, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) - if err != nil { - return nil, false, errors.Wrapf(err, "Failed to create: %s", file) - } - return lf, true, nil -} - -func getOutputWriters(dir *core.StringFlag, topics map[string]*kafka.Checkpoint) (map[string]io.Writer, bool, error) { - root := dir.Get() - result := make(map[string]io.Writer) - - if internal.IsEmpty(root) { - discard := dir.IsSet() - for topic := range topics { - if discard { - result[topic] = ioutil.Discard - continue - } - result[topic] = os.Stdout - } - return result, false, nil - } - - err := os.MkdirAll(root, 0755) - if err != nil { - return nil, false, errors.Wrap(err, "Failed to create the output directory") - } - - for topic := range topics { - file := filepath.Join(root, topic) - lf, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) - if err != nil { - return nil, false, errors.Wrapf(err, "Failed to create: %s", file) - } - result[topic] = lf - } - - return result, true, nil -} - -func configureTLS() (*tls.Config, error) { - var tlsConf tls.Config - - clientCert := tlsClientCert.Get() - clientKey := tlsClientKey.Get() - - // Mutual authentication is enabled. Both client key and certificate are needed. - if !internal.IsEmpty(clientCert) { - if internal.IsEmpty(clientKey) { - return nil, errors.New("TLS client key is missing. Mutual authentication cannot be used") - } - certificate, err := tls.LoadX509KeyPair(clientCert, clientKey) - if err != nil { - return nil, errors.Wrap(err, "Failed to load the client TLS key pair") - } - tlsConf.Certificates = []tls.Certificate{certificate} - } - - caCert := tlsCACert.Get() - - if internal.IsEmpty(caCert) { - // Server cert verification will be disabled. - // Only standard trusted certificates are used to verify the server certs. - tlsConf.InsecureSkipVerify = true - return &tlsConf, nil - } - certPool := x509.NewCertPool() - ca, err := ioutil.ReadFile(caCert) - if err != nil { - return nil, errors.Wrap(err, "Failed to read the CA certificate") - } - - if ok := certPool.AppendCertsFromPEM(ca); !ok { - return nil, errors.New("failed to append the CA certificate to the pool") - } - - tlsConf.RootCAs = certPool - - return &tlsConf, nil -} - -func getSearchColor(mode string) color.Style { - switch mode { - case internal.NoTheme: - return nil - case internal.DarkTheme: - return color.New(color.FgYellow, color.Bold) - case internal.LightTheme: - return color.New(color.FgBlue, color.Bold) - default: - return nil - } -} - -func getColorTheme(mode string, toFile bool) internal.ColorTheme { - theme := internal.ColorTheme{} - if toFile { - return theme - } - switch mode { - case internal.DarkTheme: - theme.Error = color.New(color.LightRed) - theme.Info = color.New(color.LightGreen) - theme.Warning = color.New(color.LightYellow) - case internal.LightTheme: - theme.Error = color.New(color.FgRed) - theme.Info = color.New(color.FgGreen) - theme.Warning = color.New(color.FgYellow) - } - return theme -} diff --git a/commands/broker.go b/commands/broker.go new file mode 100644 index 0000000..14906ab --- /dev/null +++ b/commands/broker.go @@ -0,0 +1,12 @@ +package commands + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// AddBrokerCommand initialises the broker top level command and adds it to the application. +func AddBrokerCommand(app *kingpin.Application, global *GlobalParameters) { + parent := app.Command("broker", "A command to manage Kafka brokers.") + kafkaParams := bindKafkaFlags(parent) + addListBrokersSubCommand(parent, global, kafkaParams) +} diff --git a/commands/common.go b/commands/common.go new file mode 100644 index 0000000..462d939 --- /dev/null +++ b/commands/common.go @@ -0,0 +1,74 @@ +package commands + +import ( + "context" + "fmt" + "os" + "os/signal" + "regexp" + "syscall" + + "github.com/gookit/color" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/kafka" +) + +const ( + plainTextFormat = "plain" + tableFormat = "table" +) + +var ( + yellow = color.Warn.Render + green = color.Info.Render + bold = color.Bold.Render +) + +func initKafkaManager(globalParams *GlobalParameters, kafkaParams *kafkaParameters) (*kafka.Manager, context.Context, context.CancelFunc, error) { + manager, err := kafka.NewManager(kafkaParams.brokers, + globalParams.Verbosity, + kafka.WithClusterVersion(kafkaParams.version), + kafka.WithTLS(kafkaParams.tls), + kafka.WithClusterVersion(kafkaParams.version), + kafka.WithSASL(kafkaParams.saslMechanism, + kafkaParams.saslUsername, + kafkaParams.saslPassword)) + + if err != nil { + return nil, nil, nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM) + <-signals + cancel() + }() + + return manager, ctx, cancel, nil +} + +func highlightLag(input int64) string { + if input > 0 { + return yellow(input) + } + return green(input) +} + +func getNotFoundMessage(entity, filterName string, ex *regexp.Regexp) string { + msg := fmt.Sprintf("No %s has been found.", entity) + if ex != nil { + msg += fmt.Sprintf(" You might need to tweak the %s filter (%s).", filterName, ex.String()) + } + return msg +} + +func addFormatFlag(c *kingpin.CmdClause, format *string) { + c.Flag("format", "Sets the output format."). + Default(tableFormat). + Short('f'). + EnumVar(format, plainTextFormat, tableFormat) +} diff --git a/commands/consume.go b/commands/consume.go new file mode 100644 index 0000000..633cf00 --- /dev/null +++ b/commands/consume.go @@ -0,0 +1,395 @@ +package commands + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "os" + "os/signal" + "path/filepath" + "regexp" + "strings" + "sync" + "syscall" + "time" + + "github.com/gookit/color" + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/internal" + "github.com/xitonix/trubka/kafka" + "github.com/xitonix/trubka/protobuf" +) + +type consume struct { + globalParams *GlobalParameters + kafkaParams *kafkaParameters + + protoRoot string + topic string + messageType string + format string + outputDir string + environment string + logFile string + topicFilter *regexp.Regexp + protoFilter *regexp.Regexp + searchQuery *regexp.Regexp + interactive bool + rewind bool + reverse bool + includeTimestamp bool + enableAutoTopicCreation bool + timeCheckpoint time.Time + offsetCheckpoint int64 +} + +// AddConsumeCommand initialises the consume command and adds it to the application. +func AddConsumeCommand(app *kingpin.Application, global *GlobalParameters) { + cmd := &consume{ + globalParams: global, + } + c := app.Command("consume", "Starts consuming from the given Kafka topic.").Action(cmd.run) + cmd.kafkaParams = bindKafkaFlags(c) + cmd.bindCommandFlags(c) +} + +func (c *consume) bindCommandFlags(command *kingpin.CmdClause) { + command.Arg("topic", "The Kafka topic to consume from.").StringVar(&c.topic) + command.Arg("proto", "The fully qualified name of the protocol buffers type, stored in the given topic."). + StringVar(&c.messageType) + command.Flag("proto-root", "The path to the folder where your *.proto files live."). + Short('r'). + Required(). + ExistingDirVar(&c.protoRoot) + command.Flag("rewind", "Starts consuming from the beginning of the stream.").Short('w').BoolVar(&c.rewind) + var timeCheckpoint string + command.Flag("from", "Starts consuming from the most recent available offset at the given time. This will override --rewind."). + Short('f'). + Action(func(context *kingpin.ParseContext) error { + t, err := parseTime(timeCheckpoint) + if err != nil { + return err + } + c.timeCheckpoint = t + return nil + }). + StringVar(&timeCheckpoint) + command.Flag("from-offset", `Starts consuming from the specified offset (if applicable). This will override --rewind and --from. + If the most recent offset value of a partition is less than the specified value, this flag will be ignored.`). + Default("-1"). + Short('o'). + Int64Var(&c.offsetCheckpoint) + command.Flag("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka."). + Short('T'). + BoolVar(&c.includeTimestamp) + command.Flag("auto-topic-creation", `Enables automatic Kafka topic creation before consuming (if it is allowed on the server). + Enabling this option in production is not recommended since it may pollute the environment with unwanted topics.`). + BoolVar(&c.enableAutoTopicCreation) + + command.Flag("format", "The format in which the Kafka messages will be written to the output."). + Default(protobuf.JsonIndent). + EnumVar(&c.format, + protobuf.Json, + protobuf.JsonIndent, + protobuf.Text, + protobuf.TextIndent, + protobuf.Hex, + protobuf.HexIndent) + command.Flag("output-dir", "The `directory` to write the Kafka messages to (Default: Stdout)."). + Short('d'). + StringVar(&c.outputDir) + command.Flag("environment", `To store the offsets on the disk in environment specific paths. It's only required + if you use Trubka to consume from different Kafka clusters on the same machine (eg. dev/prod).`). + Short('e'). + Default("local"). + StringVar(&c.environment) + + command.Flag("reverse", "If set, the messages which match the --search-query will be filtered out."). + BoolVar(&c.reverse) + + command.Flag("search-query", "The optional regular expression to filter the message content by."). + Short('q'). + RegexpVar(&c.searchQuery) + command.Flag("log-file", "The `file` to write the logs to. Set to 'none' to discard (Default: stdout)."). + Short('l'). + StringVar(&c.logFile) + + // Interactive mode flags + command.Flag("interactive", "Runs the consumer in interactive mode."). + Short('i'). + BoolVar(&c.interactive) + + command.Flag("topic-filter", "The optional regular expression to filter the remote topics by (Interactive mode only)."). + Short('t'). + RegexpVar(&c.topicFilter) + + command.Flag("proto-filter", "The optional regular expression to filter the proto types by (Interactive mode only)."). + Short('p'). + RegexpVar(&c.protoFilter) +} + +func (c *consume) run(_ *kingpin.ParseContext) error { + if !c.interactive { + if internal.IsEmpty(c.topic) { + return errors.New("Which Kafka topic you would like to consume from? Make sure you provide the topic as the first argument or switch to interactive mode (-i).") + } + if internal.IsEmpty(c.messageType) { + return errors.Errorf("Which message type is stored in %s topic? Make sure you provide the fully qualified proto name as the second argument or switch to interactive mode (-i).", c.topic) + } + } + logFile, writeLogToFile, err := c.getLogWriter() + if err != nil { + return err + } + + prn := internal.NewPrinter(c.globalParams.Verbosity, logFile) + + loader, err := protobuf.NewFileLoader(c.protoRoot) + if err != nil { + return err + } + + saramaLogWriter := ioutil.Discard + if c.globalParams.Verbosity >= internal.Chatty { + saramaLogWriter = logFile + } + + consumer, err := kafka.NewConsumer( + c.kafkaParams.brokers, prn, + c.environment, + c.enableAutoTopicCreation, + kafka.WithClusterVersion(c.kafkaParams.version), + kafka.WithTLS(c.kafkaParams.tls), + kafka.WithLogWriter(saramaLogWriter), + kafka.WithSASL(c.kafkaParams.saslMechanism, + c.kafkaParams.saslUsername, + c.kafkaParams.saslPassword)) + + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM) + <-signals + prn.Info(internal.Verbose, "Stopping Trubka.") + cancel() + }() + + topics := make(map[string]*kafka.Checkpoint) + tm := make(map[string]string) + cp := c.getCheckpoint() + if c.interactive { + topics, tm, err = readUserData(consumer, loader, c.topicFilter, c.protoFilter, cp) + if err != nil { + return err + } + } else { + tm[c.topic] = c.messageType + topics = getTopics(tm, cp) + } + + for _, messageType := range tm { + err := loader.Load(messageType) + if err != nil { + return err + } + } + + writers, writeEventsToFile, err := c.getOutputWriters(topics) + if err != nil { + return err + } + + prn.Start(writers) + + wg := sync.WaitGroup{} + + if len(tm) > 0 { + wg.Add(1) + consumerCtx, stopConsumer := context.WithCancel(context.Background()) + defer stopConsumer() + go func() { + defer wg.Done() + marshaller := protobuf.NewMarshaller(c.format, c.includeTimestamp) + var searchColor color.Style + if !writeEventsToFile { + searchColor = color.Warn.Style + } + var cancelled bool + for { + select { + case <-ctx.Done(): + if !cancelled { + stopConsumer() + cancelled = true + } + case event, more := <-consumer.Events(): + if !more { + return + } + if cancelled { + // We keep consuming and let the Events channel to drain + // Otherwise the consumer will deadlock + continue + } + output, err := c.process(tm[event.Topic], loader, event, marshaller, searchColor) + if err == nil { + prn.WriteEvent(event.Topic, output) + consumer.StoreOffset(event) + continue + } + prn.Errorf(internal.Forced, + "Failed to process the message at offset %d of partition %d, topic %s: %s", + event.Offset, + event.Partition, + event.Topic, + err) + } + } + }() + err = consumer.Start(consumerCtx, topics) + if err != nil { + prn.Errorf(internal.Forced, "Failed to start the consumer: %s", err) + } + } else { + prn.Warning(internal.Forced, "Nothing to process. Terminating Trubka.") + } + + // We still need to explicitly close the underlying Kafka client, in case `consumer.Start` has not been called. + // It is safe to close the consumer twice. + consumer.Close() + wg.Wait() + + if err != nil { + return err + } + + // Do not write to Printer after this point + if writeLogToFile { + closeFile(logFile.(*os.File)) + } + + if writeEventsToFile { + for _, w := range writers { + closeFile(w.(*os.File)) + } + } + prn.Close() + + return nil +} + +func (c *consume) getOutputWriters(topics map[string]*kafka.Checkpoint) (map[string]io.Writer, bool, error) { + result := make(map[string]io.Writer) + + if internal.IsEmpty(c.outputDir) { + for topic := range topics { + result[topic] = os.Stdout + } + return result, false, nil + } + + err := os.MkdirAll(c.outputDir, 0755) + if err != nil { + return nil, false, errors.Wrap(err, "Failed to create the output directory") + } + + for topic := range topics { + file := filepath.Join(c.outputDir, topic) + lf, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + return nil, false, errors.Wrapf(err, "Failed to create: %s", file) + } + result[topic] = lf + } + + return result, true, nil +} + +func (c *consume) process(messageType string, + loader *protobuf.FileLoader, + event *kafka.Event, + marshaller *protobuf.Marshaller, + highlightColor color.Style) ([]byte, error) { + + msg, err := loader.Get(messageType) + if err != nil { + return nil, err + } + + err = msg.Unmarshal(event.Value) + if err != nil { + return nil, err + } + + output, err := marshaller.Marshal(msg, event.Timestamp) + if err != nil { + return nil, err + } + + if c.searchQuery != nil { + matches := c.searchQuery.FindAll(output, -1) + if (matches != nil) == c.reverse { + return nil, nil + } + for _, match := range matches { + if highlightColor != nil { + output = bytes.ReplaceAll(output, match, []byte(highlightColor.Sprint(string(match)))) + } + } + } + + return output, nil +} + +func getTopics(topicMap map[string]string, cp *kafka.Checkpoint) map[string]*kafka.Checkpoint { + topics := make(map[string]*kafka.Checkpoint) + for topic := range topicMap { + topics[topic] = cp + } + return topics +} + +func (c *consume) getCheckpoint() *kafka.Checkpoint { + cp := kafka.NewCheckpoint(c.rewind) + switch { + case c.offsetCheckpoint != -1: + cp.SetOffset(c.offsetCheckpoint) + case !c.timeCheckpoint.IsZero(): + cp.SetTimeOffset(c.timeCheckpoint) + } + return cp +} + +func (c *consume) getLogWriter() (io.Writer, bool, error) { + switch strings.TrimSpace(strings.ToLower(c.logFile)) { + case "none": + return ioutil.Discard, false, nil + case "": + return os.Stdout, false, nil + default: + lf, err := os.OpenFile(c.logFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + return nil, false, errors.Wrapf(err, "Failed to create: %s", c.logFile) + } + return lf, true, nil + } +} + +func closeFile(file *os.File) { + err := file.Sync() + if err != nil { + color.Error.Printf("Failed to sync the file: %s\n", err) + } + if err := file.Close(); err != nil { + color.Error.Printf("Failed to close the file: %s\n", err) + } +} diff --git a/commands/delete_group.go b/commands/delete_group.go new file mode 100644 index 0000000..5daad95 --- /dev/null +++ b/commands/delete_group.go @@ -0,0 +1,91 @@ +package commands + +import ( + "fmt" + "regexp" + "sort" + + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/internal" + "github.com/xitonix/trubka/kafka" +) + +type deleteGroup struct { + globalParams *GlobalParameters + kafkaParams *kafkaParameters + group string + interactive bool + groupFilter *regexp.Regexp + silent bool +} + +func addDeleteGroupSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) { + cmd := &deleteGroup{ + globalParams: global, + kafkaParams: kafkaParams, + } + c := parent.Command("delete", "Deletes an empty consumer group.").Action(cmd.run) + c.Flag("group", "The consumer group name to remove."). + Short('G'). + StringVar(&cmd.group) + c.Flag("interactive", "Runs the command in interactive mode. The --group parameter will be ignored in this mode."). + Short('i'). + BoolVar(&cmd.interactive) + c.Flag("group-filter", "An optional regular expression to filter the groups by (interactive mode only)."). + Short('g'). + RegexpVar(&cmd.groupFilter) + c.Flag("silent", "Deletes the consumer group without user confirmation."). + Short('s'). + BoolVar(&cmd.silent) +} + +func (c *deleteGroup) run(_ *kingpin.ParseContext) error { + manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams) + + if err != nil { + return err + } + + defer func() { + cancel() + manager.Close() + }() + + if !c.interactive { + return c.delete(manager, c.group) + } + groups, err := manager.GetConsumerGroups(ctx, false, nil, c.groupFilter, nil) + if err != nil { + return err + } + + if len(groups) == 0 { + fmt.Println(getNotFoundMessage("consumer group", "group", c.groupFilter)) + return nil + } + + names := groups.Names() + sort.Strings(names) + index := pickAnIndex("Choose a consumer group ID to delete", "group", names) + if index < 0 { + return nil + } + toRemove := names[index] + return c.delete(manager, toRemove) +} + +func (c *deleteGroup) delete(manager *kafka.Manager, group string) error { + if internal.IsEmpty(group) { + return errors.New("Consumer group cannot be empty.") + } + if c.silent || askForConfirmation(fmt.Sprintf("Are you sure you want to delete %s", group)) { + err := manager.DeleteConsumerGroup(group) + if err != nil { + return err + } + fmt.Printf("%s consumer group has been deleted successfully.\n", group) + } + return nil +} diff --git a/commands/delete_local_offset.go b/commands/delete_local_offset.go new file mode 100644 index 0000000..3ab75b8 --- /dev/null +++ b/commands/delete_local_offset.go @@ -0,0 +1,84 @@ +package commands + +import ( + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/gookit/color" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/kafka" +) + +type deleteLocalOffsets struct { + globalParams *GlobalParameters + topicsFilter *regexp.Regexp + environment string +} + +func addDeleteLocalOffsetsSubCommand(parent *kingpin.CmdClause, params *GlobalParameters) { + cmd := &deleteLocalOffsets{ + globalParams: params, + } + c := parent.Command("delete", "Deletes the local offsets from the given environment.").Action(cmd.run) + c.Flag("topic", "An optional regular expression to filter the topics by.").Short('t').RegexpVar(&cmd.topicsFilter) + c.Arg("environment", "The environment of which the local offsets will be deleted."). + Required(). + StringVar(&cmd.environment) +} + +func (c *deleteLocalOffsets) run(_ *kingpin.ParseContext) error { + offsetManager := kafka.NewLocalOffsetManager(c.globalParams.Verbosity) + files, err := offsetManager.GetOffsetFiles(c.environment, c.topicsFilter) + if err != nil { + return err + } + + if len(files) == 0 { + color.Warn.Printf("There is no local offset stored for %s environment.\n", c.environment) + return nil + } + + topics := make([]string, len(files)+1) + for i := 0; i < len(files); i++ { + file := filepath.Base(files[i]) + topics[i] = file[:strings.LastIndex(file, ".")] + } + + topics[len(files)] = "All" + + index := pickAnIndex("Choose the topic to delete the offsets", "topic", topics) + if index < 0 { + return nil + } + + removeAll := index == (len(topics) - 1) + var path, msg string + if removeAll { + path = filepath.Dir(files[0]) + msg = fmt.Sprintf("The local offsets of %d topic(s) will be deleted from %s environment. Are you sure?", len(files), c.environment) + } else { + path = files[index] + msg = fmt.Sprintf("The local offsets of %s topic will be deleted. Are you sure?", topics[index]) + } + return confirmAndDelete(msg, path, removeAll) +} + +func confirmAndDelete(message, path string, all bool) error { + if askForConfirmation(message) { + var err error + if all { + err = os.RemoveAll(path) + } else { + err = os.Remove(path) + } + if err != nil { + return err + } + color.Info.Println("The local offsets have been removed.") + } + return nil +} diff --git a/commands/delete_topic.go b/commands/delete_topic.go new file mode 100644 index 0000000..336f50e --- /dev/null +++ b/commands/delete_topic.go @@ -0,0 +1,89 @@ +package commands + +import ( + "fmt" + "regexp" + + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/internal" + "github.com/xitonix/trubka/kafka" +) + +type deleteTopic struct { + globalParams *GlobalParameters + kafkaParams *kafkaParameters + topic string + interactive bool + topicFilter *regexp.Regexp + silent bool +} + +func addDeleteTopicSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) { + cmd := &deleteTopic{ + globalParams: global, + kafkaParams: kafkaParams, + } + c := parent.Command("delete", "Deletes a topic.").Action(cmd.run) + c.Flag("topic", "The topic to remove."). + Short('T'). + StringVar(&cmd.topic) + c.Flag("interactive", "Runs the command in interactive mode. The --topic parameter will be ignored in this mode."). + Short('i'). + BoolVar(&cmd.interactive) + c.Flag("topic-filter", "An optional regular expression to filter the topics by (interactive mode only)."). + Short('t'). + RegexpVar(&cmd.topicFilter) + c.Flag("silent", "Deletes the topic without user confirmation."). + Short('s'). + BoolVar(&cmd.silent) +} + +func (c *deleteTopic) run(_ *kingpin.ParseContext) error { + manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams) + + if err != nil { + return err + } + + defer func() { + manager.Close() + cancel() + }() + + if !c.interactive { + return c.delete(manager, c.topic) + } + topics, err := manager.GetTopics(ctx, c.topicFilter, false, "") + if err != nil { + return err + } + + if len(topics) == 0 { + fmt.Println(getNotFoundMessage("topic", "topic", c.topicFilter)) + return nil + } + + names := topics.SortedTopics() + index := pickAnIndex("Choose a topic to delete", "topic", names) + if index < 0 { + return nil + } + toRemove := names[index] + return c.delete(manager, toRemove) +} + +func (c *deleteTopic) delete(manager *kafka.Manager, topic string) error { + if internal.IsEmpty(topic) { + return errors.New("Topic cannot be empty.") + } + if c.silent || askForConfirmation(fmt.Sprintf("Are you sure you want to delete %s", topic)) { + err := manager.DeleteTopic(topic) + if err != nil { + return err + } + fmt.Printf("%s topic has been deleted successfully.\n", topic) + } + return nil +} diff --git a/commands/global_params.go b/commands/global_params.go new file mode 100644 index 0000000..e7776fa --- /dev/null +++ b/commands/global_params.go @@ -0,0 +1,7 @@ +package commands + +import "github.com/xitonix/trubka/internal" + +type GlobalParameters struct { + Verbosity internal.VerbosityLevel +} diff --git a/commands/group.go b/commands/group.go new file mode 100644 index 0000000..8f1b870 --- /dev/null +++ b/commands/group.go @@ -0,0 +1,13 @@ +package commands + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// AddGroupCommand initialises the group top level command and adds it to the application. +func AddGroupCommand(app *kingpin.Application, global *GlobalParameters) { + parent := app.Command("group", "A command to manage consumer groups.") + kafkaParams := bindKafkaFlags(parent) + addListGroupsSubCommand(parent, global, kafkaParams) + addDeleteGroupSubCommand(parent, global, kafkaParams) +} diff --git a/interactive_mode.go b/commands/interactive_mode.go similarity index 92% rename from interactive_mode.go rename to commands/interactive_mode.go index 2773662..8ba2673 100644 --- a/interactive_mode.go +++ b/commands/interactive_mode.go @@ -1,9 +1,10 @@ -package main +package commands import ( "bufio" "fmt" "os" + "regexp" "sort" "strconv" "strings" @@ -12,7 +13,7 @@ import ( "github.com/xitonix/trubka/protobuf" ) -func readUserData(consumer *kafka.Consumer, loader protobuf.Loader, topicFilter string, typeFilter string, cp *kafka.Checkpoint) (map[string]*kafka.Checkpoint, map[string]string, error) { +func readUserData(consumer *kafka.Consumer, loader protobuf.Loader, topicFilter, typeFilter *regexp.Regexp, cp *kafka.Checkpoint) (map[string]*kafka.Checkpoint, map[string]string, error) { remoteTopic, err := consumer.GetTopics(topicFilter) if err != nil { return nil, nil, err @@ -61,7 +62,7 @@ func readUserData(consumer *kafka.Consumer, loader protobuf.Loader, topicFilter // pickAnIndex returns the index of one of the items within the list func pickAnIndex(message, entryName string, input []string) int { if len(input) == 0 { - fmt.Printf("No %ss found. You may need to tweak the %[1]s filter.\n", entryName) + fmt.Printf("No %s has been found. You may need to tweak the %[1]s filter.\n", entryName) return -1 } for i, t := range input { diff --git a/commands/kafka_params.go b/commands/kafka_params.go new file mode 100644 index 0000000..2f2c4b1 --- /dev/null +++ b/commands/kafka_params.go @@ -0,0 +1,120 @@ +package commands + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/internal" + "github.com/xitonix/trubka/kafka" +) + +type kafkaParameters struct { + brokers []string + version string + tls *tls.Config + saslMechanism string + saslUsername string + saslPassword string +} + +type tlsParameters struct { + enabled bool + caCert string + clientCert string + clientKey string +} + +func bindKafkaFlags(cmd *kingpin.CmdClause) *kafkaParameters { + params := &kafkaParameters{} + cmd.Flag("brokers", "The comma separated list of Kafka brokers in server:port format."). + Short('b'). + StringsVar(¶ms.brokers) + cmd.Flag("kafka-version", "Kafka cluster version."). + Default(kafka.DefaultClusterVersion). + StringVar(¶ms.version) + + bindSASLFlags(cmd, params) + + tlsParams := bindTLSFlags(cmd) + cmd.PreAction(func(ctx *kingpin.ParseContext) error { + if !tlsParams.enabled { + return nil + } + tlsConfig, err := configureTLS(tlsParams) + if err != nil { + return err + } + params.tls = tlsConfig + return nil + }) + return params +} + +func bindTLSFlags(cmd *kingpin.CmdClause) *tlsParameters { + t := &tlsParameters{} + cmd.Flag("tls", "Enables TLS (Unverified by default). Mutual authentication can also be enabled by providing client key and certificate."). + BoolVar(&t.enabled) + cmd.Flag("ca-cert", `Trusted root certificates for verifying the server. If not set, Trubka will skip server certificate and domain verification.`). + ExistingFileVar(&t.caCert) + cmd.Flag("client-cert", `Client certification file to enable mutual TLS authentication. Client key must also be provided.`). + ExistingFileVar(&t.clientCert) + cmd.Flag("client-key", `Client private key file to enable mutual TLS authentication. Client certificate must also be provided.`). + ExistingFileVar(&t.clientKey) + return t +} + +func bindSASLFlags(cmd *kingpin.CmdClause, params *kafkaParameters) { + cmd.Flag("sasl-mechanism", "SASL authentication mechanism."). + Default(kafka.SASLMechanismNone). + EnumVar(¶ms.saslMechanism, + kafka.SASLMechanismNone, + kafka.SASLMechanismPlain, + kafka.SASLMechanismSCRAM256, + kafka.SASLMechanismSCRAM512) + cmd.Flag("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none."). + Short('U'). + StringVar(¶ms.saslUsername) + cmd.Flag("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none."). + Short('P'). + StringVar(¶ms.saslPassword) +} + +func configureTLS(params *tlsParameters) (*tls.Config, error) { + tlsConf := tls.Config{} + + // Mutual authentication is enabled. Both client key and certificate are needed. + if !internal.IsEmpty(params.clientCert) { + if internal.IsEmpty(params.clientKey) { + return nil, errors.New("TLS client key is missing. Mutual authentication cannot be used") + } + certificate, err := tls.LoadX509KeyPair(params.clientCert, params.clientKey) + if err != nil { + return nil, errors.Wrap(err, "Failed to load the client TLS key pair") + } + tlsConf.Certificates = []tls.Certificate{certificate} + } + + if internal.IsEmpty(params.caCert) { + // Server cert verification will be disabled. + // Only standard trusted certificates are used to verify the server certs. + tlsConf.InsecureSkipVerify = true + return &tlsConf, nil + } + certPool := x509.NewCertPool() + ca, err := ioutil.ReadFile(params.caCert) + if err != nil { + return nil, errors.Wrap(err, "Failed to read the CA certificate") + } + + if ok := certPool.AppendCertsFromPEM(ca); !ok { + return nil, errors.New("failed to append the CA certificate to the pool") + } + + tlsConf.RootCAs = certPool + + return &tlsConf, nil +} diff --git a/commands/list_brokers.go b/commands/list_brokers.go new file mode 100644 index 0000000..ce6cd4f --- /dev/null +++ b/commands/list_brokers.go @@ -0,0 +1,125 @@ +package commands + +import ( + "fmt" + "os" + "regexp" + "strconv" + "strings" + + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/olekukonko/tablewriter" + + "github.com/xitonix/trubka/kafka" +) + +type listBrokers struct { + globalParams *GlobalParameters + kafkaParams *kafkaParameters + includeMetadata bool + topicFilter *regexp.Regexp + + format string +} + +func addListBrokersSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) { + cmd := &listBrokers{ + globalParams: global, + kafkaParams: kafkaParams, + } + c := parent.Command("list", "Lists the brokers in the Kafka cluster.").Action(cmd.run) + c.Flag("meta", "Enables fetching metadata for each broker.").Short('m').BoolVar(&cmd.includeMetadata) + c.Flag("topic-filter", "An optional regular expression to filter the topics by (valid with --meta only)"). + Short('t'). + RegexpVar(&cmd.topicFilter) + addFormatFlag(c, &cmd.format) +} + +func (c *listBrokers) run(_ *kingpin.ParseContext) error { + manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams) + + if err != nil { + return err + } + + defer func() { + manager.Close() + cancel() + }() + + brokers, err := manager.GetBrokers(ctx, c.includeMetadata) + if err != nil { + return errors.Wrap(err, "Failed to list the brokers.") + } + + if len(brokers) == 0 { + return errors.New("No broker found") + } + + switch c.format { + case plainTextFormat: + c.printPlainTextOutput(brokers) + case tableFormat: + c.printTableOutput(brokers) + } + return nil +} + +func (c *listBrokers) printTableOutput(brokers []kafka.Broker) { + + table := tablewriter.NewWriter(os.Stdout) + headers := []string{"ID", "Address"} + if c.includeMetadata { + headers = append(headers, "Version", "Topic (No. of Partitions)") + } + table.SetHeader(headers) + table.SetAlignment(tablewriter.ALIGN_LEFT) + table.SetAutoWrapText(false) + for _, broker := range brokers { + row := []string{strconv.Itoa(broker.ID), broker.Address} + if c.includeMetadata && len(broker.Meta.Topics) > 0 { + topics := make([]string, 0) + for _, topic := range broker.Meta.Topics { + if c.topicFilter != nil && !c.topicFilter.Match([]byte(topic.Name)) { + continue + } + topics = append(topics, fmt.Sprintf("%s (%d)", topic.Name, topic.NumberOdPartitions)) + } + if len(topics) > 0 { + row = append(row, + strconv.Itoa(broker.Meta.Version), + strings.Join(topics, "\n")) + } else { + row = append(row, + strconv.Itoa(broker.Meta.Version), + getNotFoundMessage("topic", "topic", c.topicFilter)) + } + } + table.Append(row) + } + table.Render() +} + +func (c *listBrokers) printPlainTextOutput(brokers []kafka.Broker) { + for _, broker := range brokers { + fmt.Printf("%s: %s\n", bold("Broker"), broker.String()) + if c.includeMetadata && len(broker.Meta.Topics) > 0 { + topics := make([]string, 0) + for _, topic := range broker.Meta.Topics { + if c.topicFilter != nil && !c.topicFilter.Match([]byte(topic.Name)) { + continue + } + topics = append(topics, fmt.Sprintf(" %s (%d)", topic.Name, topic.NumberOdPartitions)) + } + if len(topics) > 0 { + fmt.Printf("%s\n\n", green("TOPICS (No. of Partitions)")) + fmt.Println(strings.Join(topics, "\n")) + } else { + fmt.Println(getNotFoundMessage("topic", "topic", c.topicFilter)) + } + fmt.Println() + } + } +} diff --git a/commands/list_groups.go b/commands/list_groups.go new file mode 100644 index 0000000..3b04b10 --- /dev/null +++ b/commands/list_groups.go @@ -0,0 +1,155 @@ +package commands + +import ( + "bytes" + "context" + "fmt" + "os" + "regexp" + "strconv" + + "github.com/olekukonko/tablewriter" + "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/kafka" +) + +type listGroups struct { + globalParams *GlobalParameters + kafkaParams *kafkaParameters + includeMembers bool + memberFilter *regexp.Regexp + groupFilter *regexp.Regexp + topics []string + format string +} + +func addListGroupsSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) { + cmd := &listGroups{ + globalParams: global, + kafkaParams: kafkaParams, + } + c := parent.Command("list", "Lists the consumer groups.").Action(cmd.run) + c.Flag("members", "Enables fetching consumer group members."). + Short('m'). + BoolVar(&cmd.includeMembers) + c.Flag("topics", "The list of topics to retrieve the latest and the group offsets for."). + Short('t'). + StringsVar(&cmd.topics) + c.Flag("member-filter", "An optional regular expression to filter the member ID/Client/Host by."). + Short('r'). + RegexpVar(&cmd.memberFilter) + c.Flag("group-filter", "An optional regular expression to filter the groups by."). + Short('g'). + RegexpVar(&cmd.groupFilter) + addFormatFlag(c, &cmd.format) +} + +func (c *listGroups) run(_ *kingpin.ParseContext) error { + manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams) + + if err != nil { + return err + } + + defer func() { + manager.Close() + cancel() + }() + + return c.listGroups(ctx, manager) +} + +func (c *listGroups) listGroups(ctx context.Context, manager *kafka.Manager) error { + groups, err := manager.GetConsumerGroups(ctx, c.includeMembers, c.memberFilter, c.groupFilter, c.topics) + if err != nil { + return errors.Wrap(err, "Failed to list the brokers.") + } + + if len(groups) == 0 { + fmt.Println(getNotFoundMessage("consumer group", "group", c.groupFilter)) + return nil + } + + switch c.format { + case plainTextFormat: + c.printPlainTextOutput(groups) + case tableFormat: + c.printTableOutput(groups) + } + return nil +} + +func (*listGroups) printTableOutput(groups kafka.ConsumerGroups) { + for name, group := range groups { + groupTable := tablewriter.NewWriter(os.Stdout) + groupTable.SetAutoWrapText(false) + groupTable.SetAutoFormatHeaders(false) + groupTable.SetHeader([]string{"Group: " + name}) + groupTable.SetColMinWidth(0, 80) + if len(group.Members) > 0 { + buff := bytes.Buffer{} + buff.WriteString(fmt.Sprintf("\nMembers:\n")) + table := tablewriter.NewWriter(&buff) + table.SetHeader([]string{"Name", "Client ID", "Host"}) + for _, member := range group.Members { + table.Append([]string{member.ID, member.ClientID, member.Host}) + } + table.Render() + groupTable.Append([]string{buff.String()}) + } + + if len(group.TopicOffsets) > 0 { + buff := bytes.Buffer{} + buff.WriteString(fmt.Sprintf("\nGroup Offsets:\n")) + table := tablewriter.NewWriter(&buff) + table.SetHeader([]string{"Partition", "Latest", "Current", "Lag"}) + table.SetColMinWidth(0, 20) + table.SetColMinWidth(1, 20) + table.SetColMinWidth(2, 20) + table.SetColMinWidth(3, 20) + table.SetAlignment(tablewriter.ALIGN_CENTER) + + for _, partitionOffsets := range group.TopicOffsets { + partitions := partitionOffsets.SortPartitions() + for _, partition := range partitions { + offsets := partitionOffsets[int32(partition)] + latest := strconv.FormatInt(offsets.Latest, 10) + current := strconv.FormatInt(offsets.Current, 10) + part := strconv.FormatInt(int64(partition), 10) + table.Append([]string{part, latest, current, highlightLag(offsets.Lag())}) + } + } + table.Render() + groupTable.Append([]string{buff.String()}) + } + groupTable.SetHeaderLine(false) + groupTable.Render() + } +} + +func (*listGroups) printPlainTextOutput(groups kafka.ConsumerGroups) { + for name, group := range groups { + fmt.Printf("%s: %s\n", bold("Group"), name) + if len(group.Members) > 0 { + fmt.Printf("\n%s\n\n", green("Members:")) + for i, member := range group.Members { + fmt.Printf(" %2d: %s\n", i+1, member) + } + fmt.Println() + } + + if len(group.TopicOffsets) > 0 { + fmt.Printf("\n%s\n\n", green("Partition Offsets:")) + for _, partitionOffsets := range group.TopicOffsets { + partitions := partitionOffsets.SortPartitions() + for _, partition := range partitions { + offsets := partitionOffsets[int32(partition)] + fmt.Printf(" Partition %2d: %d out of %d (Lag: %s) \n", partition, offsets.Current, offsets.Latest, highlightLag(offsets.Lag())) + } + } + fmt.Println() + } + } +} diff --git a/commands/list_local_offsets.go b/commands/list_local_offsets.go new file mode 100644 index 0000000..f34d8b3 --- /dev/null +++ b/commands/list_local_offsets.go @@ -0,0 +1,87 @@ +package commands + +import ( + "fmt" + "os" + "regexp" + "strconv" + + "github.com/gookit/color" + "github.com/olekukonko/tablewriter" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/kafka" +) + +type listLocalOffsets struct { + globalParams *GlobalParameters + topicsFilter *regexp.Regexp + envFilter *regexp.Regexp + short bool +} + +func addListOffsetsSubCommand(parent *kingpin.CmdClause, params *GlobalParameters) { + cmd := &listLocalOffsets{ + globalParams: params, + } + c := parent.Command("list", "Lists the local offsets for different environments.").Action(cmd.run) + c.Flag("topic", "An optional regular expression to filter the topics by.").Short('t').RegexpVar(&cmd.topicsFilter) + c.Flag("environment", "An optional regular expression to filter the environments by.").Short('e').RegexpVar(&cmd.envFilter) + c.Flag("short", "Enables short output. Offsets wont be printed in this mode.").Short('s').BoolVar(&cmd.short) +} + +func (c *listLocalOffsets) run(_ *kingpin.ParseContext) error { + offsetManager := kafka.NewLocalOffsetManager(c.globalParams.Verbosity) + offsetMap, err := offsetManager.ListLocalOffsets(c.topicsFilter, c.envFilter) + if err != nil { + return err + } + if len(offsetMap) == 0 { + filtered := c.envFilter != nil || c.topicsFilter != nil + msg := "No offsets have been stored locally." + if filtered { + msg += " You might need to tweak the filters." + } + color.Warn.Println(msg) + } + + for environment, topicOffsets := range offsetMap { + if len(topicOffsets) == 0 { + continue + } + sortedTopics := topicOffsets.SortedTopics() + + color.Bold.Print("Environment") + fmt.Printf(": %s\n", environment) + table := tablewriter.NewWriter(os.Stdout) + headers := []string{"Topic", "Partition", "Offset"} + table.SetHeader(headers) + table.SetColumnAlignment([]int{tablewriter.ALIGN_LEFT, tablewriter.ALIGN_CENTER, tablewriter.ALIGN_CENTER}) + rows := make([][]string, 0) + for i, topic := range sortedTopics { + if c.short { + fmt.Printf(" %d: %s\n", i+1, topic) + continue + } + partitionOffsets := topicOffsets[topic] + sortedPartitions := partitionOffsets.SortPartitions() + for i, partition := range sortedPartitions { + firstCell := topic + if i > 0 { + firstCell = "" + } + rows = append(rows, []string{ + firstCell, + strconv.Itoa(partition), + strconv.FormatInt(partitionOffsets[int32(partition)].Current, 10), + }) + } + } + if !c.short { + table.AppendBulk(rows) + table.Render() + } + fmt.Println() + } + return nil +} diff --git a/commands/list_topics.go b/commands/list_topics.go new file mode 100644 index 0000000..40f76cd --- /dev/null +++ b/commands/list_topics.go @@ -0,0 +1,137 @@ +package commands + +import ( + "fmt" + "os" + "regexp" + "strconv" + + "github.com/olekukonko/tablewriter" + "gopkg.in/alecthomas/kingpin.v2" + + "github.com/xitonix/trubka/kafka" +) + +type listTopics struct { + kafkaParams *kafkaParameters + globalParams *GlobalParameters + + topicFilter *regexp.Regexp + includeOffsets bool + environment string + format string +} + +func addListTopicsSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) { + cmd := &listTopics{ + kafkaParams: kafkaParams, + globalParams: global, + } + c := parent.Command("list", "Loads the existing topics from the server.").Action(cmd.run) + c.Flag("topic-filter", "An optional regular expression to filter the topics by.").Short('t').RegexpVar(&cmd.topicFilter) + c.Flag("partitions", "If enabled, the partition offset data will be retrieved too.").Short('p').BoolVar(&cmd.includeOffsets) + c.Flag("environment", "The environment to load the local offsets for (if any).").Short('e').StringVar(&cmd.environment) + addFormatFlag(c, &cmd.format) +} + +func (c *listTopics) run(_ *kingpin.ParseContext) error { + manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams) + + if err != nil { + return err + } + + defer func() { + manager.Close() + cancel() + }() + + topics, err := manager.GetTopics(ctx, c.topicFilter, c.includeOffsets, c.environment) + if err != nil { + return err + } + + if len(topics) == 0 { + fmt.Println(getNotFoundMessage("topic", "topic", c.topicFilter)) + return nil + } + + switch c.format { + case plainTextFormat: + c.printPlainTextOutput(topics) + case tableFormat: + c.printTableOutput(topics) + } + return nil +} + +func (c *listTopics) printPlainTextOutput(tpo kafka.TopicPartitionOffset) { + sortedTopics := tpo.SortedTopics() + for _, topic := range sortedTopics { + fmt.Printf("%s: %s\n", bold("Topic"), topic) + partitions := tpo[topic] + if !c.includeOffsets { + continue + } + keys := partitions.SortPartitions() + fmt.Println() + for _, partition := range keys { + offset := partitions[int32(partition)] + msg := fmt.Sprintf(" Partition %2d: ", partition) + if offset.Current >= 0 { + msg += fmt.Sprintf(" Local Offset %d out of %d", offset.Current, offset.Latest) + lag := offset.Lag() + if lag > 0 { + msg += fmt.Sprintf(" (Lag: %s)", highlightLag(lag)) + } + } else { + msg += fmt.Sprintf("%d", offset.Latest) + } + fmt.Println(msg) + } + fmt.Println() + } +} + +func (c *listTopics) printTableOutput(tpo kafka.TopicPartitionOffset) { + sortedTopics := tpo.SortedTopics() + + table := tablewriter.NewWriter(os.Stdout) + headers := []string{"Topic"} + if c.includeOffsets { + headers = append(headers, "Partition", "Latest Offset", "Local Offset", "Lag") + } + table.SetHeader(headers) + table.SetColumnAlignment([]int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_CENTER, + tablewriter.ALIGN_CENTER, + tablewriter.ALIGN_CENTER, + tablewriter.ALIGN_CENTER, + }) + for _, topic := range sortedTopics { + partitions := tpo[topic] + row := []string{topic} + if !c.includeOffsets { + table.Append(row) + continue + } + keys := partitions.SortPartitions() + rows := make([][]string, 0) + for i, partition := range keys { + firstCell := topic + if i > 0 { + firstCell = "" + } + op := partitions[int32(partition)] + lagStr := "-" + if op.Current >= 0 { + lagStr = highlightLag(op.Lag()) + } + rows = append(rows, []string{firstCell, strconv.Itoa(partition), op.String(true), op.String(false), lagStr}) + } + table.AppendBulk(rows) + + } + table.Render() +} diff --git a/commands/local.go b/commands/local.go new file mode 100644 index 0000000..e754307 --- /dev/null +++ b/commands/local.go @@ -0,0 +1,12 @@ +package commands + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// AddLocalOffsetCommand initialises the top level local offset management command and adds it to the application. +func AddLocalOffsetCommand(app *kingpin.Application, global *GlobalParameters) { + parent := app.Command("local", "Manages the locally stored offsets.") + addListOffsetsSubCommand(parent, global) + addDeleteLocalOffsetsSubCommand(parent, global) +} diff --git a/time_parser.go b/commands/time_parser.go similarity index 98% rename from time_parser.go rename to commands/time_parser.go index 1b54c9b..c043ba8 100644 --- a/time_parser.go +++ b/commands/time_parser.go @@ -1,4 +1,4 @@ -package main +package commands import ( "time" diff --git a/commands/topic.go b/commands/topic.go new file mode 100644 index 0000000..92dda0b --- /dev/null +++ b/commands/topic.go @@ -0,0 +1,13 @@ +package commands + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// AddTopicCommand initialises the topic top level command and adds it to the application. +func AddTopicCommand(app *kingpin.Application, global *GlobalParameters) { + parent := app.Command("topic", "A command to manage Kafka topics.") + kafkaParams := bindKafkaFlags(parent) + addListTopicsSubCommand(parent, global, kafkaParams) + addDeleteTopicSubCommand(parent, global, kafkaParams) +} diff --git a/commands/version.go b/commands/version.go new file mode 100644 index 0000000..f01b102 --- /dev/null +++ b/commands/version.go @@ -0,0 +1,28 @@ +package commands + +import ( + "fmt" + + "gopkg.in/alecthomas/kingpin.v2" +) + +type version struct { + version string + app *kingpin.Application +} + +func AddVersionCommand(app *kingpin.Application, appVersion string) { + cmd := &version{ + version: appVersion, + app: app, + } + app.Command("version", "Prints the current version of Trubka.").Action(cmd.run) +} + +func (c *version) run(ctx *kingpin.ParseContext) error { + if c.version == "" { + c.version = "[built from source]" + } + fmt.Printf("%s %s\n", c.app.Name, c.version) + return nil +} diff --git a/go.mod b/go.mod index 8e8cb33..8af461b 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.0 // indirect github.com/Shopify/sarama v1.23.0 + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect @@ -12,9 +14,10 @@ require ( github.com/jcmturner/gofork v1.0.0 // indirect github.com/jhump/protoreflect v1.5.0 github.com/kirsle/configdir v0.0.0-20170128060238-e45d2f54772f + github.com/mattn/go-runewidth v0.0.4 // indirect + github.com/olekukonko/tablewriter v0.0.1 github.com/peterbourgon/diskv v2.0.1+incompatible github.com/pkg/errors v0.8.1 - github.com/pkg/profile v1.3.0 github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xitonix/flags v0.1.1 @@ -22,9 +25,9 @@ require ( golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // indirect golang.org/x/text v0.3.2 // indirect - google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect google.golang.org/grpc v1.22.1 // indirect + gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect ) diff --git a/go.sum b/go.sum index 2326415..81837af 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,10 @@ github.com/Shopify/sarama v1.23.0 h1:slvlbm7bxyp7sKQbUwha5BQdZTqurhRoI+zbKorVigQ github.com/Shopify/sarama v1.23.0/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -45,6 +49,10 @@ github.com/jhump/protoreflect v1.5.0 h1:NgpVT+dX71c8hZnxHof2M7QDK7QtohIJ7DYycjnk github.com/jhump/protoreflect v1.5.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74= github.com/kirsle/configdir v0.0.0-20170128060238-e45d2f54772f h1:dKccXx7xA56UNqOcFIbuqFjAWPVtP688j5QMgmo6OHU= github.com/kirsle/configdir v0.0.0-20170128060238-e45d2f54772f/go.mod h1:4rEELDSfUAlBSyUjPG0JnaNGjf13JySHFeRdD/3dLP0= +github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= +github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8urCTFX88= +github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU= @@ -52,8 +60,6 @@ github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0je github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= -github.com/pkg/profile v1.3.0 h1:OQIvuDgm00gWVWGTf4m4mCt6W1/0YqU7Ntg0mySWgaI= -github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= @@ -118,6 +124,8 @@ google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEd google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= diff --git a/internal/logger.go b/internal/logger.go new file mode 100644 index 0000000..6522578 --- /dev/null +++ b/internal/logger.go @@ -0,0 +1,25 @@ +package internal + +import ( + "fmt" + "time" +) + +type Logger struct { + currentLevel VerbosityLevel +} + +func NewLogger(level VerbosityLevel) *Logger { + return &Logger{currentLevel: level} +} + +func (l *Logger) Log(level VerbosityLevel, message string) { + if l.currentLevel < level { + return + } + fmt.Println(time.Now().Format(loggingTimestampLayout) + message) +} + +func (l *Logger) Logf(level VerbosityLevel, format string, a ...interface{}) { + l.Log(level, fmt.Sprintf(format, a...)) +} diff --git a/internal/printer.go b/internal/printer.go index 385c124..8c5ae05 100644 --- a/internal/printer.go +++ b/internal/printer.go @@ -4,12 +4,12 @@ import ( "fmt" "io" "sync" - - "github.com/gookit/color" + "time" ) const ( - loggingWriterKey = "___trubka__logging__writer__key___" + loggingWriterKey = "___trubka__logging__writer__key___" + loggingTimestampLayout = "2006/01/02 15:04:05 " ) // Printer represents a printer type. @@ -24,38 +24,26 @@ type Printer interface { Warning(level VerbosityLevel, msg string) WriteEvent(topic string, bytes []byte) Close() -} - -type ColorTheme struct { - Error color.Style - Info color.Style - Warning color.Style -} - -type printable struct { - msg string - style color.Style + Level() VerbosityLevel } // SyncPrinter is an implementation of Printer interface to synchronously write to specified io.Writer instances. type SyncPrinter struct { currentLevel VerbosityLevel wg sync.WaitGroup - targets map[string]chan *printable - uniqueTargets map[io.Writer]chan *printable - theme ColorTheme + targets map[string]chan string + uniqueTargets map[io.Writer]chan string } // NewPrinter creates a new synchronised writer. -func NewPrinter(currentLevel VerbosityLevel, logOutput io.Writer, theme ColorTheme) *SyncPrinter { - logInput := make(chan *printable, 100) +func NewPrinter(currentLevel VerbosityLevel, logOutput io.Writer) *SyncPrinter { + logInput := make(chan string, 100) return &SyncPrinter{ - theme: theme, currentLevel: currentLevel, - uniqueTargets: map[io.Writer]chan *printable{ + uniqueTargets: map[io.Writer]chan string{ logOutput: logInput, }, - targets: map[string]chan *printable{ + targets: map[string]chan string{ loggingWriterKey: logInput, }, } @@ -66,7 +54,7 @@ func (p *SyncPrinter) Start(messageOutputs map[string]io.Writer) { for topic, writer := range messageOutputs { input, ok := p.uniqueTargets[writer] if !ok { - input = make(chan *printable) + input = make(chan string) p.uniqueTargets[writer] = input } @@ -75,15 +63,10 @@ func (p *SyncPrinter) Start(messageOutputs map[string]io.Writer) { for w, in := range p.uniqueTargets { p.wg.Add(1) - go func(writer io.Writer, input chan *printable) { + go func(writer io.Writer, input chan string) { defer p.wg.Done() - for p := range input { - var err error - if p.style == nil { - _, err = fmt.Fprintln(writer, p.msg) - } else { - p.style.Println(p.msg) - } + for msg := range input { + _, err := fmt.Fprintln(writer, msg) if err != nil { fmt.Printf("Failed to write the entry: %s\n", err) } @@ -104,46 +87,46 @@ func (p *SyncPrinter) Close() { // Log writes a new line to the Logging io.Writer synchronously if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Log(level VerbosityLevel, msg string) { - p.log(level, msg, nil) + p.log(level, msg) } // Logf formats according to a format specifier and writes a new line to the Logging io.Writer synchronously, // if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Logf(level VerbosityLevel, format string, a ...interface{}) { - p.log(level, fmt.Sprintf(format, a...), nil) + p.log(level, fmt.Sprintf(format, a...)) } // Info writes a new line to the Logging io.Writer synchronously if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Info(level VerbosityLevel, msg string) { - p.log(level, msg, p.theme.Info) + p.log(level, msg) } // Infof formats according to a format specifier and writes a new line to the Logging io.Writer synchronously, // if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Infof(level VerbosityLevel, format string, a ...interface{}) { - p.log(level, fmt.Sprintf(format, a...), p.theme.Info) + p.log(level, fmt.Sprintf(format, a...)) } // Warning writes a new line to the Logging io.Writer synchronously if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Warning(level VerbosityLevel, msg string) { - p.log(level, msg, p.theme.Warning) + p.log(level, msg) } // Warningf formats according to a format specifier and writes a new line to the Logging io.Writer synchronously, // if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Warningf(level VerbosityLevel, format string, a ...interface{}) { - p.log(level, fmt.Sprintf(format, a...), p.theme.Warning) + p.log(level, fmt.Sprintf(format, a...)) } // Error writes a new line to the Logging io.Writer synchronously if the verbosity level is greater than or equal to the current level. -func (p *SyncPrinter) Error(level VerbosityLevel, msg string, ) { - p.log(level, msg, p.theme.Error) +func (p *SyncPrinter) Error(level VerbosityLevel, msg string) { + p.log(level, msg) } // Errorf formats according to a format specifier and writes a new line to the Logging io.Writer synchronously, // if the verbosity level is greater than or equal to the current level. func (p *SyncPrinter) Errorf(level VerbosityLevel, format string, a ...interface{}) { - p.log(level, fmt.Sprintf(format, a...), p.theme.Error) + p.log(level, fmt.Sprintf(format, a...)) } // WriteEvent writes the event content to the relevant message io.Writer. @@ -151,17 +134,17 @@ func (p *SyncPrinter) WriteEvent(topic string, bytes []byte) { if len(bytes) == 0 { return } - p.targets[topic] <- &printable{ - msg: string(bytes), - } + p.targets[topic] <- string(bytes) } -func (p *SyncPrinter) log(level VerbosityLevel, msg string, style color.Style) { +// Level returns the current verbosity level +func (p *SyncPrinter) Level() VerbosityLevel { + return p.currentLevel +} + +func (p *SyncPrinter) log(level VerbosityLevel, msg string) { if p.currentLevel < level { return } - p.targets[loggingWriterKey] <- &printable{ - msg: msg, - style: style, - } + p.targets[loggingWriterKey] <- time.Now().Format(loggingTimestampLayout) + msg } diff --git a/internal/terminal_mode.go b/internal/terminal_mode.go deleted file mode 100644 index 30d4e77..0000000 --- a/internal/terminal_mode.go +++ /dev/null @@ -1,7 +0,0 @@ -package internal - -const ( - NoTheme = "none" - DarkTheme = "dark" - LightTheme = "light" -) diff --git a/internal/utils.go b/internal/utils.go index a683738..99d9e9e 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -17,3 +17,10 @@ func FormatTime(t time.Time) string { func FormatTimeUTC(t time.Time) string { return FormatTime(t) + " UTC" } + +func BoolToString(in bool) string { + if in { + return "Yes" + } + return "No" +} diff --git a/internal/verbosity_level.go b/internal/verbosity_level.go index c36bced..f30c7f4 100644 --- a/internal/verbosity_level.go +++ b/internal/verbosity_level.go @@ -3,6 +3,14 @@ package internal // VerbosityLevel logging verbosity level. type VerbosityLevel int8 +var verbosityToString = map[VerbosityLevel]string{ + Forced: "default", + Verbose: "verbose", + VeryVerbose: "very verbose", + SuperVerbose: "super verbose", + Chatty: "chatty", +} + const ( // Forced the lowest logging level. Everything will be printed under this level. Forced VerbosityLevel = iota @@ -16,7 +24,6 @@ const ( Chatty ) -// ToVerbosityLevel converts an integer to verbosity level. func ToVerbosityLevel(counter int) VerbosityLevel { switch { case counter == 1: diff --git a/kafka/broker.go b/kafka/broker.go new file mode 100644 index 0000000..0dcad22 --- /dev/null +++ b/kafka/broker.go @@ -0,0 +1,13 @@ +package kafka + +import "fmt" + +type Broker struct { + Address string + ID int + Meta *BrokerMetadata +} + +func (b Broker) String() string { + return fmt.Sprintf("%s (ID: %d)", b.Address, b.ID) +} diff --git a/kafka/broker_metadata.go b/kafka/broker_metadata.go new file mode 100644 index 0000000..0872ffd --- /dev/null +++ b/kafka/broker_metadata.go @@ -0,0 +1,6 @@ +package kafka + +type BrokerMetadata struct { + Version int + Topics []Topic +} diff --git a/kafka/consumer.go b/kafka/consumer.go index 3894672..2b126cc 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -42,11 +42,16 @@ func NewConsumer(brokers []string, printer internal.Printer, environment string, sarama.Logger = log.New(ops.logWriter, "KAFKA Client: ", log.LstdFlags) - client, consumer, err := initClient(brokers, ops) + client, err := initClient(brokers, ops) if err != nil { return nil, err } + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + return nil, errors.Wrap(err, "failed to initialise the Kafka consumer") + } + store, err := newOffsetStore(printer, environment) if err != nil { return nil, err @@ -66,18 +71,10 @@ func NewConsumer(brokers []string, printer internal.Printer, environment string, } // GetTopics fetches the topics from the server. -func (c *Consumer) GetTopics(filter string) ([]string, error) { +func (c *Consumer) GetTopics(search *regexp.Regexp) ([]string, error) { if c.remoteTopics != nil { return c.remoteTopics, nil } - var search *regexp.Regexp - if !internal.IsEmpty(filter) { - s, err := regexp.Compile(filter) - if err != nil { - return nil, errors.Wrap(err, "Invalid topic filter regular expression") - } - search = s - } topics, err := c.internalConsumer.Topics() if err != nil { @@ -154,7 +151,7 @@ func (c *Consumer) Close() { }) } -func (c *Consumer) consumeTopics(ctx context.Context, topicPartitionOffsets map[string]PartitionOffsets) { +func (c *Consumer) consumeTopics(ctx context.Context, topicPartitionOffsets TopicPartitionOffset) { cn, cancel := context.WithCancel(ctx) defer cancel() var cancelled bool @@ -178,7 +175,7 @@ func (c *Consumer) consumeTopics(ctx context.Context, topicPartitionOffsets map[ default: err := c.consumePartition(cn, topic, partition, offset) if err != nil { - c.printer.Errorf(internal.Forced, "Failed to start consuming from %s offset of topic %s, partition %d: %s", getOffsetString(offset), topic, partition, err) + c.printer.Errorf(internal.Forced, "Failed to start consuming from %s offset of topic %s, partition %d: %s", getOffsetString(offset.Current), topic, partition, err) cancel() cancelled = true } @@ -190,13 +187,13 @@ func (c *Consumer) consumeTopics(ctx context.Context, topicPartitionOffsets map[ c.Close() } -func (c *Consumer) consumePartition(ctx context.Context, topic string, partition int32, offset int64) error { +func (c *Consumer) consumePartition(ctx context.Context, topic string, partition int32, offset Offset) error { select { case <-ctx.Done(): return nil default: - c.printer.Logf(internal.VeryVerbose, "Start consuming from partition %d of topic %s (offset: %v).", partition, topic, getOffsetString(offset)) - pc, err := c.internalConsumer.ConsumePartition(topic, partition, offset) + c.printer.Logf(internal.VeryVerbose, "Start consuming from partition %d of topic %s (offset: %v).", partition, topic, getOffsetString(offset.Current)) + pc, err := c.internalConsumer.ConsumePartition(topic, partition, offset.Current) if err != nil { return errors.Wrapf(err, "Failed to start consuming partition %d of topic %s", partition, topic) } @@ -239,12 +236,12 @@ func (c *Consumer) consumePartition(ctx context.Context, topic string, partition } } -func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (map[string]PartitionOffsets, error) { +func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (TopicPartitionOffset, error) { existing := make(map[string]interface{}) if !c.enableAutoTopicCreation { // We need to check if the requested topic(s) exist on the server // That's why we need to get the list of the existing topics from the brokers. - remote, err := c.GetTopics("") + remote, err := c.GetTopics(nil) if err != nil { return nil, errors.Wrapf(err, "Failed to fetch the topic list from the broker(s)") } @@ -253,8 +250,9 @@ func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (map[stri } } - topicPartitionOffsets := make(map[string]PartitionOffsets) + topicPartitionOffsets := make(TopicPartitionOffset) + localOffsetManager := NewLocalOffsetManager(c.printer.Level()) for topic, cp := range topics { if !c.enableAutoTopicCreation { if _, ok := existing[topic]; !ok { @@ -262,7 +260,7 @@ func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (map[stri } } c.printer.Logf(internal.SuperVerbose, "Fetching partitions for topic %s.", topic) - offsets, err := c.store.Query(topic) + offsets, err := localOffsetManager.ReadLocalTopicOffsets(topic, c.environment) if err != nil { return nil, err } @@ -310,7 +308,7 @@ func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (map[stri break } if storedOffset, ok := offsets[partition]; ok { - offset = storedOffset + offset = storedOffset.Current } } c.printer.Logf(internal.SuperVerbose, @@ -318,28 +316,17 @@ func (c *Consumer) fetchTopicPartitions(topics map[string]*Checkpoint) (map[stri partition, getOffsetString(offset), topic) - offsets[partition] = offset + offsets[partition] = Offset{Current: offset} } topicPartitionOffsets[topic] = offsets } return topicPartitionOffsets, nil } -func getOffsetString(offset int64) interface{} { - switch offset { - case sarama.OffsetOldest: - return "oldest" - case sarama.OffsetNewest: - return "newest" - default: - return offset - } -} - -func initClient(brokers []string, ops *Options) (sarama.Client, sarama.Consumer, error) { +func initClient(brokers []string, ops *Options) (sarama.Client, error) { version, err := sarama.ParseKafkaVersion(ops.ClusterVersion) if err != nil { - return nil, nil, err + return nil, err } config := sarama.NewConfig() config.Version = version @@ -361,13 +348,19 @@ func initClient(brokers []string, ops *Options) (sarama.Client, sarama.Consumer, client, err := sarama.NewClient(brokers, config) if err != nil { - return nil, nil, errors.Wrap(err, "failed to initialise the Kafka client") + return nil, errors.Wrap(err, "failed to initialise the Kafka client") } - consumer, err := sarama.NewConsumerFromClient(client) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to initialise the Kafka consumer") - } + return client, nil +} - return client, consumer, nil +func getOffsetString(offset int64) interface{} { + switch offset { + case sarama.OffsetOldest: + return "oldest" + case sarama.OffsetNewest: + return "newest" + default: + return offset + } } diff --git a/kafka/consumer_group.go b/kafka/consumer_group.go new file mode 100644 index 0000000..bef8f1b --- /dev/null +++ b/kafka/consumer_group.go @@ -0,0 +1,61 @@ +package kafka + +import ( + "fmt" + "regexp" + + "github.com/Shopify/sarama" +) + +// GroupMember represents a consumer group member. +type GroupMember struct { + // ID the member identifier. + ID string + // ClientID client ID. + ClientID string + // Host the host name/IP of the client machine. + Host string +} + +func (g GroupMember) String() string { + return fmt.Sprintf("%s/%s(%s)", g.ID, g.ClientID, g.Host) +} + +// ConsumerGroup represents a consumer group. +type ConsumerGroup struct { + // Members the clients attached to the consumer groups. + Members []GroupMember + // TopicOffsets the offsets of each topic belong to the group. + TopicOffsets TopicPartitionOffset +} + +func (c *ConsumerGroup) addMembers(members map[string]*sarama.GroupMemberDescription, memberFilter *regexp.Regexp) { + c.Members = make([]GroupMember, 0) + for id, m := range members { + if memberFilter != nil && !(memberFilter.Match([]byte(id)) || + memberFilter.Match([]byte(m.ClientHost)) || + memberFilter.Match([]byte(m.ClientId))) { + continue + } + member := GroupMember{ + ID: id, + ClientID: m.ClientId, + Host: m.ClientHost, + } + c.Members = append(c.Members, member) + } +} + +// ConsumerGroups the map of consumer groups keyed by consumer group ID. +type ConsumerGroups map[string]*ConsumerGroup + +// Names returns the names of the consumer groups +func (c ConsumerGroups) Names() []string { + names := make([]string, len(c)) + i := 0 + for name := range c { + names[i] = name + i++ + } + return names +} diff --git a/kafka/errors.go b/kafka/errors.go new file mode 100644 index 0000000..f94c0d9 --- /dev/null +++ b/kafka/errors.go @@ -0,0 +1,11 @@ +package kafka + +import "github.com/pkg/errors" + +var ( + // ErrEmptyEnvironment occurs when the provided environment is empty. + ErrEmptyEnvironment = errors.New("The environment cannot be empty") + + // ErrEmptyTopic occurs when the provided topic is empty. + ErrEmptyTopic = errors.New("The topic cannot be empty") +) diff --git a/kafka/local_offset_manager.go b/kafka/local_offset_manager.go new file mode 100644 index 0000000..b616534 --- /dev/null +++ b/kafka/local_offset_manager.go @@ -0,0 +1,167 @@ +package kafka + +import ( + "bytes" + "encoding/gob" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/kirsle/configdir" + "github.com/peterbourgon/diskv" + "github.com/pkg/errors" + + "github.com/xitonix/trubka/internal" +) + +const ( + localOffsetRoot = "trubka" + offsetFileExtension = ".tpo" +) + +type LocalOffsetManager struct { + root string + db *diskv.Diskv + *internal.Logger +} + +func NewLocalOffsetManager(level internal.VerbosityLevel) *LocalOffsetManager { + root := configdir.LocalConfig(localOffsetRoot) + flatTransform := func(s string) []string { return []string{} } + return &LocalOffsetManager{ + Logger: internal.NewLogger(level), + root: root, + db: diskv.New(diskv.Options{ + BasePath: root, + Transform: flatTransform, + CacheSizeMax: 1024 * 1024, + }), + } +} + +// GetOffsetFiles returns a list of all the offset files for the given environment. +func (l *LocalOffsetManager) GetOffsetFiles(environment string, topicFilter *regexp.Regexp) ([]string, error) { + if internal.IsEmpty(environment) { + return nil, ErrEmptyEnvironment + } + root := configdir.LocalConfig(localOffsetRoot, environment) + l.Logf(internal.Verbose, "Looking for local offsets in %s", root) + + files := make([]string, 0) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() || !strings.HasSuffix(path, offsetFileExtension) { + return nil + } + + file := filepath.Base(path) + if topicFilter != nil && !topicFilter.Match([]byte(file)) { + return nil + } + l.Logf(internal.VeryVerbose, "Local offset file has been found: %s", file) + files = append(files, path) + return nil + }) + + if err != nil { + if os.IsNotExist(err) { + return nil, errors.Errorf("The local offset directory could not be found at %s", root) + } + return nil, err + } + + return files, nil +} + +// ReadLocalTopicOffsets returns the locally stored offsets of the given topic for the specified environment if exists. +// +// If there is no local offsets, the method will return an empty partition-offset map. +func (l *LocalOffsetManager) ReadLocalTopicOffsets(topic string, environment string) (PartitionOffset, error) { + file, err := l.setDBPath(topic, environment) + if err != nil { + return nil, err + } + + stored := make(map[int32]int64) + l.Logf(internal.VeryVerbose, "Reading the local offsets of %s topic from %s", topic, l.db.BasePath) + val, err := l.db.Read(file) + if err != nil { + if os.IsNotExist(err) { + return PartitionOffset{}, nil + } + return nil, err + } + + buff := bytes.NewBuffer(val) + dec := gob.NewDecoder(buff) + err = dec.Decode(&stored) + if err != nil { + return nil, errors.Wrapf(err, "Failed to deserialize the value from local offset store for topic %s", topic) + } + + return ToPartitionOffset(stored, false), nil +} + +// ListLocalOffsets lists the locally stored offsets for the the topics of all the available environments. +// +// The returned map is keyed by the environment name. +func (l *LocalOffsetManager) ListLocalOffsets(topicFilter *regexp.Regexp, envFilter *regexp.Regexp) (map[string]TopicPartitionOffset, error) { + result := make(map[string]TopicPartitionOffset) + root := configdir.LocalConfig(localOffsetRoot) + l.Logf(internal.Verbose, "Searching for local offsets in %s", root) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() || !strings.HasSuffix(path, offsetFileExtension) { + return nil + } + environment := filepath.Base(filepath.Dir(path)) + if envFilter != nil && !envFilter.Match([]byte(environment)) { + return nil + } + file := filepath.Base(path) + topic := strings.TrimSuffix(file, offsetFileExtension) + if topicFilter != nil && !topicFilter.Match([]byte(topic)) { + l.Logf(internal.SuperVerbose, "The provided filter (%s) does not match with %s topic", topicFilter.String(), topic) + return nil + } + po, err := l.ReadLocalTopicOffsets(file, environment) + if err != nil { + return err + } + if _, ok := result[environment]; !ok { + result[environment] = make(TopicPartitionOffset) + } + result[environment][topic] = po + l.Logf(internal.Chatty, "%d partition offset(s) found locally for %s/%s", len(po), environment, topic) + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func (l *LocalOffsetManager) setDBPath(topic string, environment string) (string, error) { + if internal.IsEmpty(environment) { + return "", ErrEmptyEnvironment + } + if internal.IsEmpty(topic) { + return "", ErrEmptyTopic + } + + l.db.BasePath = filepath.Join(l.root, environment) + + file := topic + if !strings.HasSuffix(file, offsetFileExtension) { + file += offsetFileExtension + } + + return file, nil +} diff --git a/kafka/manager.go b/kafka/manager.go new file mode 100644 index 0000000..b2000a5 --- /dev/null +++ b/kafka/manager.go @@ -0,0 +1,334 @@ +package kafka + +import ( + "context" + "io/ioutil" + "log" + "os" + "regexp" + "sort" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" + + "github.com/xitonix/trubka/internal" +) + +// Manager a type to query Kafka metadata. +type Manager struct { + config *Options + client sarama.Client + admin sarama.ClusterAdmin + localOffsets *LocalOffsetManager + servers []*sarama.Broker + *internal.Logger +} + +// NewManager creates a new instance of Kafka manager +func NewManager(brokers []string, verbosity internal.VerbosityLevel, options ...Option) (*Manager, error) { + if len(brokers) == 0 { + return nil, errors.New("The brokers list cannot be empty") + } + ops := NewOptions() + for _, option := range options { + option(ops) + } + + logWriter := ioutil.Discard + if verbosity >= internal.Chatty { + logWriter = os.Stdout + } + + sarama.Logger = log.New(logWriter, "", log.LstdFlags) + + client, err := initClient(brokers, ops) + if err != nil { + return nil, err + } + + servers := client.Brokers() + addresses := make([]string, len(servers)) + for i, broker := range servers { + addresses[i] = broker.Addr() + } + + admin, err := sarama.NewClusterAdmin(addresses, client.Config()) + if err != nil { + return nil, errors.Wrap(err, "Failed to create a new cluster administrator.") + } + + return &Manager{ + config: ops, + client: client, + Logger: internal.NewLogger(verbosity), + localOffsets: NewLocalOffsetManager(verbosity), + admin: admin, + servers: servers, + }, nil +} + +// GetTopics loads a list of the available topics from the server. +func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeOffsets bool, environment string) (TopicPartitionOffset, error) { + m.Log(internal.Verbose, "Retrieving topic list from the server") + topics, err := m.client.Topics() + if err != nil { + return nil, err + } + + result := make(TopicPartitionOffset) + queryLocal := !internal.IsEmpty(environment) + for _, topic := range topics { + m.Logf(internal.SuperVerbose, "Topic %s has been found on the server", topic) + select { + case <-ctx.Done(): + return result, nil + default: + if filter != nil && !filter.Match([]byte(topic)) { + m.Logf(internal.SuperVerbose, "Filtering out %s topic", topic) + continue + } + result[topic] = make(PartitionOffset) + if !includeOffsets { + continue + } + m.Logf(internal.VeryVerbose, "Retrieving the partition(s) of %s topic from the server", topic) + partitions, err := m.client.Partitions(topic) + if err != nil { + return nil, err + } + local := make(PartitionOffset) + + if queryLocal { + m.Logf(internal.VeryVerbose, "Reading local offsets of %s topic", topic) + local, err = m.localOffsets.ReadLocalTopicOffsets(topic, environment) + if err != nil { + return nil, err + } + } + for _, partition := range partitions { + select { + case <-ctx.Done(): + return result, nil + default: + offset := newOffset() + m.Logf(internal.SuperVerbose, "Reading the latest offset of partition %d for %s topic from the server", partition, topic) + latestOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetNewest) + if err != nil { + return nil, err + } + offset.Latest = latestOffset + lo, ok := local[partition] + if !ok && queryLocal { + offset.Current = offsetNotFound + } + if ok && lo.Current >= 0 { + offset.Current = lo.Current + } + result[topic][partition] = offset + } + } + } + } + return result, nil +} + +func (m *Manager) DeleteConsumerGroup(group string) error { + return m.admin.DeleteConsumerGroup(group) +} + +func (m *Manager) DeleteTopic(topic string) error { + return m.admin.DeleteTopic(topic) +} + +func (m *Manager) GetConsumerGroups(ctx context.Context, includeMembers bool, memberFilter, groupFilter *regexp.Regexp, topics []string) (ConsumerGroups, error) { + result := make(ConsumerGroups) + select { + case <-ctx.Done(): + return result, nil + default: + + m.Log(internal.Verbose, "Retrieving consumer groups from the server") + groups, err := m.admin.ListConsumerGroups() + if err != nil { + return nil, errors.Wrap(err, "Failed to fetch the consumer groups from the server.") + } + groupNames := make([]string, 0) + for group := range groups { + select { + case <-ctx.Done(): + return result, nil + default: + if groupFilter != nil && !groupFilter.Match([]byte(group)) { + continue + } + if includeMembers { + groupNames = append(groupNames, group) + } + result[group] = &ConsumerGroup{} + } + } + + if len(result) == 0 { + return result, nil + } + + if includeMembers { + m.Log(internal.Verbose, "Retrieving consumer group members from the server") + groupsMeta, err := m.admin.DescribeConsumerGroups(groupNames) + if err != nil { + return nil, errors.Wrap(err, "Failed to retrieve the group members from the server") + } + for _, gm := range groupsMeta { + select { + case <-ctx.Done(): + return result, nil + default: + m.Logf(internal.VeryVerbose, "Retrieving the members of %s consumer group", gm.GroupId) + result[gm.GroupId].addMembers(gm.Members, memberFilter) + } + } + } + + if len(topics) > 0 { + topicPartitions := make(map[string][]int32) + m.Log(internal.Verbose, "Retrieving topic partitions from the server") + for _, topic := range topics { + select { + case <-ctx.Done(): + return result, nil + default: + if internal.IsEmpty(topic) { + continue + } + m.Logf(internal.VeryVerbose, "Retrieving the partition(s) of %s topic from the server", topic) + partitions, err := m.client.Partitions(topic) + if err != nil { + return nil, err + } + topicPartitions[topic] = partitions + } + } + err = m.setGroupOffsets(ctx, result, topicPartitions) + if err != nil { + return nil, err + } + } + + return result, nil + } +} + +func (m *Manager) setGroupOffsets(ctx context.Context, consumerGroups ConsumerGroups, topicPartitions map[string][]int32) error { + for groupID, group := range consumerGroups { + select { + case <-ctx.Done(): + return nil + default: + m.Logf(internal.VeryVerbose, "Retrieving the offsets for %s consumer group", groupID) + cgOffsets, err := m.admin.ListConsumerGroupOffsets(groupID, topicPartitions) + if err != nil { + return errors.Wrap(err, "Failed to retrieve the consumer group offsets") + } + group.TopicOffsets = make(TopicPartitionOffset) + for topic, blocks := range cgOffsets.Blocks { + for partition, block := range blocks { + select { + case <-ctx.Done(): + return nil + default: + if block.Offset < 0 { + continue + } + if _, ok := group.TopicOffsets[topic]; !ok { + // We add the topic, only if there is a group offset for one of its partitions + group.TopicOffsets[topic] = make(PartitionOffset) + } + m.Logf(internal.SuperVerbose, "Retrieving the latest offset of partition %d of %s topic from the server", partition, topic) + total, err := m.client.GetOffset(topic, partition, sarama.OffsetOldest) + if err != nil { + return err + } + group.TopicOffsets[topic][partition] = Offset{ + Current: total, + Latest: block.Offset, + } + } + } + } + } + } + return nil +} + +// GetBrokers returns the current set of active brokers as retrieved from cluster metadata. +func (m *Manager) GetBrokers(ctx context.Context, includeMetadata bool) ([]Broker, error) { + m.Log(internal.Verbose, "Retrieving broker list from the server") + result := make([]Broker, 0) + for _, broker := range m.servers { + select { + case <-ctx.Done(): + return result, nil + default: + b := Broker{ + ID: int(broker.ID()), + Address: broker.Addr(), + } + if includeMetadata { + m, err := m.getMetadata(broker) + if err != nil { + return nil, err + } + b.Meta = m + } + result = append(result, b) + } + } + return result, nil +} + +// Close closes the underlying Kafka connection. +func (m *Manager) Close() { + m.Logf(internal.Verbose, "Closing kafka manager.") + err := m.admin.Close() + if err != nil { + m.Logf(internal.Forced, "Failed to close the cluster admin: %s", err) + } + + err = m.client.Close() + if err != nil { + m.Logf(internal.Forced, "Failed to close Kafka client: %s", err) + return + } + m.Logf(internal.Verbose, "Kafka manager has been closed successfully.") +} + +func (m *Manager) getMetadata(broker *sarama.Broker) (*BrokerMetadata, error) { + meta := &BrokerMetadata{ + Topics: make([]Topic, 0), + } + m.Logf(internal.VeryVerbose, "Connecting to broker ID #%d", broker.ID()) + if err := broker.Open(m.client.Config()); err != nil { + return nil, err + } + defer func() { + m.Logf(internal.VeryVerbose, "Closing the connection to broker ID #%d", broker.ID()) + _ = broker.Close() + }() + m.Logf(internal.Verbose, "Retrieving metadata for broker ID #%d", broker.ID()) + mt, err := broker.GetMetadata(&sarama.MetadataRequest{}) + if err != nil { + return nil, err + } + meta.Version = int(mt.Version) + for _, topic := range mt.Topics { + if topic != nil { + m.Logf(internal.SuperVerbose, "The metadata of topic %s has been retrieved from broker ID #%d", topic.Name, broker.ID()) + meta.Topics = append(meta.Topics, Topic{ + Name: topic.Name, + NumberOdPartitions: len(topic.Partitions), + }) + } + } + sort.Sort(TopicsByName(meta.Topics)) + return meta, nil +} diff --git a/kafka/offset.go b/kafka/offset.go new file mode 100644 index 0000000..9e2ea3c --- /dev/null +++ b/kafka/offset.go @@ -0,0 +1,48 @@ +package kafka + +import "strconv" + +const unknownOffset int64 = -3 +const offsetNotFound int64 = -4 + +// Offset represents an offset pair for a given partition. +// +// A pair contains the latest offset of the partition reported by the server and the local or consumer group offset. +type Offset struct { + // Latest the latest available offset of the partition reported by the server. + Latest int64 + // Current the current value of the local or consumer group offset. + Current int64 +} + +func newOffset() Offset { + return Offset{ + Latest: unknownOffset, + Current: unknownOffset, + } +} + +// Lag calculates the lag between the latest and the current offset values. +func (o Offset) Lag() int64 { + if o.Latest > o.Current { + return o.Latest - o.Current + } + return 0 +} + +// String returns the string representation of the given offset. +func (o Offset) String(latest bool) string { + if latest { + return getOffsetText(o.Latest) + } + return getOffsetText(o.Current) +} + +func getOffsetText(offset int64) string { + switch offset { + case unknownOffset, offsetNotFound: + return "-" + default: + return strconv.FormatInt(offset, 10) + } +} diff --git a/kafka/offset_store.go b/kafka/offset_store.go index 42b3f09..3ae5ee5 100644 --- a/kafka/offset_store.go +++ b/kafka/offset_store.go @@ -1,9 +1,6 @@ package kafka import ( - "bytes" - "encoding/gob" - "os" "strings" "sync" "time" @@ -35,7 +32,7 @@ func newOffsetStore(printer internal.Printer, environment string) (*offsetStore, if len(environment) == 0 { return nil, errors.New("empty environment value is not acceptable") } - root := configdir.LocalConfig("trubka", environment) + root := configdir.LocalConfig(localOffsetRoot, environment) err := configdir.MakePath(root) if err != nil { return nil, errors.Wrap(err, "Failed to create the application cache folder") @@ -59,12 +56,12 @@ func newOffsetStore(printer internal.Printer, environment string) (*offsetStore, }, nil } -func (s *offsetStore) start(loaded map[string]PartitionOffsets) { +func (s *offsetStore) start(loaded TopicPartitionOffset) { s.wg.Add(1) ticker := time.NewTicker(3 * time.Second) - offsets := make(map[string]PartitionOffsets) + offsets := make(TopicPartitionOffset) for t, lpo := range loaded { - partOffsets := make(PartitionOffsets) + partOffsets := make(PartitionOffset) lpo.copyTo(partOffsets) offsets[t] = partOffsets } @@ -83,9 +80,9 @@ func (s *offsetStore) start(loaded map[string]PartitionOffsets) { } _, ok := offsets[p.topic] if !ok { - offsets[p.topic] = make(PartitionOffsets) + offsets[p.topic] = make(PartitionOffset) } - offsets[p.topic][p.partition] = p.offset + offsets[p.topic][p.partition] = Offset{Current: p.offset} } } }() @@ -101,26 +98,6 @@ func (s *offsetStore) Store(topic string, partition int32, offset int64) error { return nil } -// Query loads the offsets of all the available partitions from the local disk. -func (s *offsetStore) Query(topic string) (PartitionOffsets, error) { - offsets := make(PartitionOffsets) - val, err := s.db.Read(topic) - if err != nil { - if os.IsNotExist(err) { - return offsets, nil - } - return nil, err - } - - buff := bytes.NewBuffer(val) - dec := gob.NewDecoder(buff) - err = dec.Decode(&offsets) - if err != nil { - return nil, errors.Wrapf(err, "Failed to deserialize the value from local offset store for topic %s", topic) - } - return offsets, nil -} - // Returns the channel on which the write errors will be received. // You must listen to this channel to avoid deadlock. func (s *offsetStore) errors() <-chan error { @@ -138,9 +115,9 @@ func (s *offsetStore) close() { s.printer.Info(internal.SuperVerbose, "The offset store has been closed successfully.") } -func (s *offsetStore) writeOffsetsToDisk(offsets map[string]PartitionOffsets) { - for topic, offsets := range offsets { - cs, buff, err := offsets.marshal() +func (s *offsetStore) writeOffsetsToDisk(topicPartitionOffsets TopicPartitionOffset) { + for topic, partitionOffsets := range topicPartitionOffsets { + cs, buff, err := partitionOffsets.marshal() if err != nil { s.writeErrors <- errors.Wrapf(err, "Failed to serialise the offsets of topic %s", topic) return @@ -153,12 +130,12 @@ func (s *offsetStore) writeOffsetsToDisk(offsets map[string]PartitionOffsets) { } s.checksum[cs] = nil s.printer.Infof(internal.SuperVerbose, "Writing the offset(s) of topic %s to the disk.", topic) - for p, o := range offsets { - if o >= 0 { - s.printer.Logf(internal.Chatty, " P%02d: %d", p, o) + for p, offset := range partitionOffsets { + if offset.Current >= 0 { + s.printer.Logf(internal.Chatty, " P%02d: %d", p, offset.Current) } } - err = s.db.Write(topic, buff) + err = s.db.Write(topic+offsetFileExtension, buff) if err != nil { s.writeErrors <- errors.Wrapf(err, "Failed to write the offsets of topic %s to the disk %s", topic, cs) } diff --git a/kafka/partition_offset.go b/kafka/partition_offset.go index 9a7d67a..3442a2c 100644 --- a/kafka/partition_offset.go +++ b/kafka/partition_offset.go @@ -4,21 +4,35 @@ import ( "bytes" "encoding/gob" "fmt" + "sort" "strings" ) -// PartitionOffsets represents a map of partitions and offsets -type PartitionOffsets map[int32]int64 +// PartitionOffset represents a map of partition offset pairs. +type PartitionOffset map[int32]Offset + +// SortPartitions returns a list of sorted partitions. +func (p PartitionOffset) SortPartitions() []int { + sorted := make([]int, 0) + if len(p) == 0 { + return sorted + } + for partition := range p { + sorted = append(sorted, int(partition)) + } + sort.Ints(sorted) + return sorted +} // serialises the offset map and returns the bytes as well as the checksum string of the current values. -func (p PartitionOffsets) marshal() (string, []byte, error) { +func (p PartitionOffset) marshal() (string, []byte, error) { if len(p) == 0 { return "", []byte{}, nil } - toWrite := make(PartitionOffsets) + toWrite := make(map[int32]int64) for pt, of := range p { - if of >= 0 { - toWrite[pt] = of + if of.Current >= 0 { + toWrite[pt] = of.Current } } if len(toWrite) == 0 { @@ -33,16 +47,33 @@ func (p PartitionOffsets) marshal() (string, []byte, error) { return strings.Replace(fmt.Sprintf("%v", toWrite), "map", "", 1), buff.Bytes(), nil } -func (p PartitionOffsets) copyTo(dest PartitionOffsets) { +func (p PartitionOffset) copyTo(dest PartitionOffset) { if len(p) == 0 { return } if dest == nil { - dest = make(PartitionOffsets) + dest = make(PartitionOffset) } for partition, offset := range p { - if offset >= 0 { + if offset.Current >= 0 { dest[partition] = offset } } } + +// ToPartitionOffset creates a new PartitionOffset map from a raw map. +// +// Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value. +func ToPartitionOffset(po map[int32]int64, latest bool) PartitionOffset { + result := make(PartitionOffset) + for partition, offset := range po { + off := Offset{} + if latest { + off.Latest = offset + } else { + off.Current = offset + } + result[partition] = off + } + return result +} diff --git a/kafka/topic.go b/kafka/topic.go new file mode 100644 index 0000000..5a39a1e --- /dev/null +++ b/kafka/topic.go @@ -0,0 +1,6 @@ +package kafka + +type Topic struct { + Name string + NumberOdPartitions int +} diff --git a/kafka/topic_partition_offset.go b/kafka/topic_partition_offset.go new file mode 100644 index 0000000..0216f9a --- /dev/null +++ b/kafka/topic_partition_offset.go @@ -0,0 +1,30 @@ +package kafka + +import "sort" + +// TopicPartitionOffsetPair represents a map of topic offset pairs for all the partitions. +type TopicPartitionOffset map[string]PartitionOffset + +// SortedTopics returns a list of sorted topics. +func (t TopicPartitionOffset) SortedTopics() []string { + sorted := make([]string, 0) + if len(t) == 0 { + return sorted + } + for topic := range t { + sorted = append(sorted, topic) + } + sort.Strings(sorted) + return sorted +} + +// ToTopicPartitionOffset creates a new TopicPartitionOffset from a raw map. +// +// Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value. +func ToTopicPartitionOffset(tpo map[string]map[int32]int64, latest bool) TopicPartitionOffset { + result := make(TopicPartitionOffset) + for topic, po := range tpo { + result[topic] = ToPartitionOffset(po, latest) + } + return result +} diff --git a/kafka/topics.go b/kafka/topics.go new file mode 100644 index 0000000..bec8d11 --- /dev/null +++ b/kafka/topics.go @@ -0,0 +1,15 @@ +package kafka + +type TopicsByName []Topic + +func (t TopicsByName) Len() int { + return len(t) +} + +func (t TopicsByName) Less(i, j int) bool { + return t[i].Name < t[j].Name +} + +func (t TopicsByName) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} diff --git a/main.go b/main.go index 4e9ec10..456fc0e 100644 --- a/main.go +++ b/main.go @@ -1,299 +1,21 @@ package main import ( - "bytes" - "context" - "crypto/tls" - "fmt" - "io/ioutil" "os" - "os/signal" - "regexp" - "strings" - "sync" - "syscall" - "github.com/golang/protobuf/proto" "github.com/gookit/color" - "github.com/pkg/errors" - "github.com/pkg/profile" - "github.com/xitonix/flags/core" - - "github.com/xitonix/trubka/internal" - "github.com/xitonix/trubka/kafka" - "github.com/xitonix/trubka/protobuf" ) var version string func main() { - - initFlags() - - if versionRequest.Get() { - printVersion() - return - } - - if internal.IsEmpty(environment.Get()) { - exit(errors.New("The environment cannot be empty.")) - } - - var searchExpression *regexp.Regexp - if searchQuery.IsSet() { - se, err := regexp.Compile(searchQuery.Get()) - if err != nil { - exit(errors.Wrap(err, "Failed to parse the search query")) - } - searchExpression = se - } - - if profilingMode.IsSet() { - switch strings.ToLower(profilingMode.Get()) { - case "cpu": - defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() - case "mem": - defer profile.Start(profile.MemProfile, profile.ProfilePath(".")).Stop() - case "mutex": - defer profile.Start(profile.MutexProfile, profile.ProfilePath(".")).Stop() - case "block": - defer profile.Start(profile.BlockProfile, profile.ProfilePath(".")).Stop() - case "thread": - defer profile.Start(profile.ThreadcreationProfile, profile.ProfilePath(".")).Stop() - } - } - - colorMode := strings.ToLower(terminalMode.Get()) - logFile, writeLogToFile, err := getLogWriter(logFilePath) - if err != nil { - exit(err) - } - - theme := getColorTheme(colorMode, writeLogToFile) - - level := internal.ToVerbosityLevel(verbosity.Get()) - prn := internal.NewPrinter(level, logFile, theme) - - loader, err := protobuf.NewFileLoader(protoDir.Get(), protoFiles.Get()...) + err := newApplication() if err != nil { exit(err) } - - var tlsConfig *tls.Config - if enableTLS.Get() { - tlsConfig, err = configureTLS() - if err != nil { - exit(err) - } - } - - saramLogWriter := ioutil.Discard - if level >= internal.Chatty { - saramLogWriter = logFile - } - - consumer, err := kafka.NewConsumer( - brokers.Get(), prn, - environment.Get(), - enableAutoTopicCreation.Get(), - kafka.WithClusterVersion(kafkaVersion.Get()), - kafka.WithTLS(tlsConfig), - kafka.WithLogWriter(saramLogWriter), - kafka.WithSASL(saslMechanism.Get(), saslUsername.Get(), saslPassword.Get())) - - if err != nil { - exit(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM) - <-signals - prn.Info(internal.Verbose, "Stopping Trubka.") - cancel() - }() - - topics := make(map[string]*kafka.Checkpoint) - tm := make(map[string]string) - cp := getCheckpoint(rewind.Get(), timeCheckpoint, offsetCheckpoint) - if interactive.Get() { - topics, tm, err = readUserData(consumer, loader, topicFilter.Get(), typeFilter.Get(), cp) - if err != nil { - exit(err) - } - } else { - tm[topic.Get()] = messageType.Get() - topics = getTopics(tm, cp) - } - - for _, messageType := range tm { - err := loader.Load(messageType) - if err != nil { - exit(err) - } - } - - writers, writeEventsToFile, err := getOutputWriters(outputDir, topics) - if err != nil { - exit(err) - } - - prn.Start(writers) - - wg := sync.WaitGroup{} - - if len(tm) > 0 { - wg.Add(1) - consumerCtx, stopConsumer := context.WithCancel(context.Background()) - defer stopConsumer() - go func() { - defer wg.Done() - reversed := reverse.Get() - marshaller := protobuf.NewMarshaller(format.Get(), includeTimeStamp.Get()) - var searchColor color.Style - if !writeEventsToFile { - searchColor = getSearchColor(colorMode) - } - var cancelled bool - for { - select { - case <-ctx.Done(): - if !cancelled { - stopConsumer() - cancelled = true - } - case event, more := <-consumer.Events(): - if !more { - return - } - if cancelled { - // We keep consuming and let the Events channel to drain - // Otherwise the consumer will deadlock - continue - } - output, err := process(tm[event.Topic], loader, event, marshaller, searchExpression, reversed, searchColor) - if err == nil { - prn.WriteEvent(event.Topic, output) - consumer.StoreOffset(event) - continue - } - prn.Errorf(internal.Forced, - "Failed to process the message at offset %d of partition %d, topic %s: %s", - event.Offset, - event.Partition, - event.Topic, - err) - } - } - }() - err = consumer.Start(consumerCtx, topics) - if err != nil { - prn.Errorf(internal.Forced, "Failed to start the consumer: %s", err) - } - } else { - prn.Warning(internal.Forced, "Nothing to process. Terminating Trubka.") - } - - // We still need to explicitly close the underlying Kafka client, in case `consumer.Start` has not been called. - // It is safe to close the consumer twice. - consumer.Close() - wg.Wait() - - if err != nil { - exit(err) - } - - // Do not write to Printer after this point - if writeLogToFile { - closeFile(logFile.(*os.File)) - } - - if writeEventsToFile { - for _, w := range writers { - closeFile(w.(*os.File)) - } - } - prn.Close() -} - -func getCheckpoint(rewind bool, timeCheckpoint *core.TimeFlag, offsetCheckpoint *core.Int64Flag) *kafka.Checkpoint { - cp := kafka.NewCheckpoint(rewind) - switch { - case offsetCheckpoint.IsSet(): - cp.SetOffset(offsetCheckpoint.Get()) - case timeCheckpoint.IsSet(): - cp.SetTimeOffset(timeCheckpoint.Get()) - } - return cp -} - -func printVersion() { - if version == "" { - version = "[built from source]" - } - fmt.Printf("Trubka %s\n", version) -} - -func process(messageType string, - loader *protobuf.FileLoader, - event *kafka.Event, - marshaller *protobuf.Marshaller, - search *regexp.Regexp, - reverse bool, - highlightColor color.Style) ([]byte, error) { - - msg, err := loader.Get(messageType) - if err != nil { - return nil, err - } - - err = proto.Unmarshal(event.Value, msg) - if err != nil { - return nil, err - } - - output, err := marshaller.Marshal(msg, event.Timestamp) - if err != nil { - return nil, err - } - - if search != nil { - matches := search.FindAll(output, -1) - if (matches != nil) == reverse { - return nil, nil - } - for _, match := range matches { - if highlightColor != nil { - output = bytes.ReplaceAll(output, match, []byte(highlightColor.Sprint(string(match)))) - } - } - } - - return output, nil -} - -func getTopics(topicMap map[string]string, cp *kafka.Checkpoint) map[string]*kafka.Checkpoint { - topics := make(map[string]*kafka.Checkpoint) - for topic := range topicMap { - topics[topic] = cp - } - return topics } func exit(err error) { color.Error.Printf("FATAL: %s\n", err) os.Exit(1) } - -func closeFile(file *os.File) { - err := file.Sync() - if err != nil { - fmt.Printf("Failed to sync the file: %s\n", err) - } - if err := file.Close(); err != nil { - fmt.Printf("Failed to close the file: %s\n", err) - } -} diff --git a/protobuf/loader.go b/protobuf/loader.go index 42a1345..a7cc555 100644 --- a/protobuf/loader.go +++ b/protobuf/loader.go @@ -10,15 +10,13 @@ import ( "github.com/jhump/protoreflect/desc/protoparse" "github.com/jhump/protoreflect/dynamic" "github.com/pkg/errors" - - "github.com/xitonix/trubka/internal" ) // Loader the interface to load and list the protocol buffer message types. type Loader interface { Load(messageName string) error Get(messageName string) (*dynamic.Message, error) - List(filter string) ([]string, error) + List(filter *regexp.Regexp) ([]string, error) } const protoExtension = ".proto" @@ -130,15 +128,7 @@ func (f *FileLoader) Get(messageName string) (*dynamic.Message, error) { } // List returns a list of all the protocol buffer messages exist in the path. -func (f *FileLoader) List(filter string) ([]string, error) { - var search *regexp.Regexp - if !internal.IsEmpty(filter) { - s, err := regexp.Compile(filter) - if err != nil { - return nil, errors.Wrap(err, "invalid type filter regular expression") - } - search = s - } +func (f *FileLoader) List(search *regexp.Regexp) ([]string, error) { result := make([]string, 0) for _, fd := range f.files { messages := fd.GetMessageTypes()