Skip to content

Easy implementation of kafka consumer with built-in exception manager (kafka-cronsumer)

License

Notifications You must be signed in to change notification settings

ademekici/kafka-konsumer

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Konsumer

🔨Build And Test 🔨IntegrationTest Go Report Card

Description

Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).

Migration Guide

V2 Release Notes

  • Added ability for manipulating kafka message headers.
  • Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
  • Enable manuel commit at both single and batch consuming modes.
  • Enabling consumer resume/pause functionality. Please refer to its example and how it works documentation.
  • Bumped kafka-cronsumer to the latest version:
    • Backoff strategy support (linear, exponential options)
    • Added message key for retried messages
    • Added x-error-message to see what was the error of the message during processing
  • Reduce memory allocation.
  • Increase TP on changing internal concurrency structure.

How to migrate from v1 to v2?

You can get latest version via go get github.com/Trendyol/kafka-konsumer/v2@latest

  • You need to change import path from github.com/Trendyol/kafka-konsumer to github.com/Trendyol/kafka-konsumer/v2

  • You need to change your consume function with pointer signature.

  • We moved messageGroupDuration from batchConfiguration.messageGroupDuration to root level. Because this field is used single (non-batch) consumer too.

Installation

go get github.com/Trendyol/kafka-konsumer/v2@latest

Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Simple Consumer
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        ConsumeFn:    consumeFn,
        RetryEnabled: false,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
Simple Consumer With Retry/Exception Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        ConsumeFn: consumeFn,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        MessageGroupDuration: time.Second,
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil
}
With Disabling Transactional Retry
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        TransactionalRetry: kafka.NewBoolPtr(false),
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        MessageGroupDuration: time.Second,
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    // you can add custom error handling here & flag messages
    for i := range messages {
        if i%2 == 0 {
            messages[i].IsFailed = true
        }
    }

    // you must return err here to retry failed messages
    return errors.New("err")
}

With Distributed Tracing Support

Please refer to Tracing Example

With Pause & Resume Consumer

Please refer to Pause Resume Example

With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.

grafana

With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.

Configurations

config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
skipMessageByHeaderFn Function to filter messages based on headers, return true if you want to skip the message nil
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
transactionalRetry Set false if you want to use exception/retry strategy to only failed messages true
commitInterval indicates the interval at which offsets are committed to the broker. 1s
rack see doc
clientId see doc
messageGroupDuration Maximum time to wait for a batch 1s
metricPrefix MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_konsumer. Currently, there are two exposed prometheus metrics. processed_messages_total and unprocessed_messages_total So, if default metric prefix used, metrics names are kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current. kafka_konsumer
dial.Timeout see doc no timeout
dial.KeepAlive see doc not enabled
transport.DialTimeout see doc 5s
transport.IdleTimeout see doc 30s
transport.MetadataTTL see doc 6s
transport.MetadataTopics see doc all topics in cluster
distributedTracingEnabled indicates open telemetry support on/off for consume and produce operations. false
distributedTracingConfiguration.TracerProvider see doc otel.GetTracerProvider()
distributedTracingConfiguration.Propagator see doc otel.GetTextMapPropagator()
retryConfiguration.clientId see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.metricPrefix MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current kafka_cronsumer
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
retryConfiguration.skipMessageByHeaderFn Function to filter messages based on headers, return true if you want to skip the message nil
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.batchConsumeFn Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages
batchConfiguration.preBatchFn This function enable for transforming messages before batch consuming starts
batchConfiguration.balancer see doc leastBytes
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics

Monitoring

Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics

Metric Name Description Value Type
kafka_konsumer_processed_messages_total_current Total number of processed messages. Counter
kafka_konsumer_unprocessed_messages_total_current Total number of unprocessed messages. Counter

About

Easy implementation of kafka consumer with built-in exception manager (kafka-cronsumer)

Resources

License

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 98.7%
  • Makefile 1.3%