From dec132530742189f3bd27c1eb5514ad7271d574d Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Fri, 6 Jan 2023 17:44:59 +0300 Subject: [PATCH] Update product batch to prevent event missing --- connector.go | 11 +----- kafka/message/message.go | 4 +- kafka/producer/producer.go | 20 ++++++++-- kafka/producer/producer_batch.go | 68 ++++++++++++++------------------ 4 files changed, 49 insertions(+), 54 deletions(-) diff --git a/connector.go b/connector.go index 193a5cc..f35385e 100644 --- a/connector.go +++ b/connector.go @@ -1,8 +1,6 @@ package gokafkaconnectcouchbase import ( - "context" - "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" godcpclient "github.com/Trendyol/go-dcp-client" @@ -57,13 +55,8 @@ func (c *connector) listener(event interface{}, err error) { } if kafkaMessage := c.mapper(e); kafkaMessage != nil { - defer message.KafkaMessagePool.Put(kafkaMessage) - // TODO: use contexts - ctx := context.TODO() - err = c.producer.Produce(&ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers) - if err != nil { - c.errorLogger.Printf("error | %v", err) - } + defer message.MessagePool.Put(kafkaMessage) + c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers) } } diff --git a/kafka/message/message.go b/kafka/message/message.go index 4b4efeb..228dba7 100644 --- a/kafka/message/message.go +++ b/kafka/message/message.go @@ -11,14 +11,14 @@ type KafkaMessage struct { } func GetKafkaMessage(key []byte, value []byte, headers map[string]string) *KafkaMessage { - message := KafkaMessagePool.Get().(*KafkaMessage) + message := MessagePool.Get().(*KafkaMessage) message.Key = key message.Value = value message.Headers = headers return message } -var KafkaMessagePool = sync.Pool{ +var MessagePool = sync.Pool{ New: func() any { return &KafkaMessage{} }, diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index 7dfb5c4..1acfdce 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -1,8 +1,8 @@ package kafka import ( - "context" "math" + "sync" "time" "github.com/Trendyol/go-kafka-connect-couchbase/config" @@ -12,7 +12,7 @@ import ( ) type Producer interface { - Produce(ctx *context.Context, message []byte, key []byte, headers map[string]string) error + Produce(message []byte, key []byte, headers map[string]string) Close() error } @@ -25,6 +25,8 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger. Topic: config.Topic, Addr: kafka.TCP(config.Brokers...), Balancer: &kafka.Hash{}, + BatchSize: config.ProducerBatchSize, + BatchBytes: math.MaxInt, MaxAttempts: math.MaxInt, ReadTimeout: config.ReadTimeout, WriteTimeout: config.WriteTimeout, @@ -37,8 +39,18 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger. } } -func (a *producer) Produce(ctx *context.Context, message []byte, key []byte, headers map[string]string) error { - return a.producerBatch.AddMessage(kafka.Message{Key: key, Value: message, Headers: newHeaders(headers)}) +var KafkaMessagePool = sync.Pool{ + New: func() any { + return &kafka.Message{} + }, +} + +func (a *producer) Produce(message []byte, key []byte, headers map[string]string) { + msg := KafkaMessagePool.Get().(*kafka.Message) + msg.Key = key + msg.Value = message + msg.Headers = newHeaders(headers) + a.producerBatch.messageChn <- msg } func newHeaders(headersMap map[string]string) []kafka.Header { diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index 937088b..a886870 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -2,7 +2,6 @@ package kafka import ( "context" - "sync" "time" "github.com/Trendyol/go-kafka-connect-couchbase/logger" @@ -16,10 +15,10 @@ type producerBatch struct { batchTicker *time.Ticker Writer *kafka.Writer isClosed chan bool + messageChn chan *kafka.Message messages []kafka.Message batchTickerDuration time.Duration batchLimit int - flushMutex sync.Mutex } func newProducerBatch( @@ -32,6 +31,7 @@ func newProducerBatch( batch := &producerBatch{ batchTickerDuration: batchTime, batchTicker: time.NewTicker(batchTime), + messageChn: make(chan *kafka.Message, batchLimit), messages: make([]kafka.Message, 0, batchLimit), Writer: writer, batchLimit: batchLimit, @@ -39,51 +39,41 @@ func newProducerBatch( logger: logger, errorLogger: errorLogger, } - go func() { - errChan := make(chan error, 1) - batch.CheckBatchTicker(errChan) - - for err := range errChan { - errorLogger.Printf("Batch producer flush error %v", err) - } - }() + batch.StartBatch() return batch } -func (b *producerBatch) CheckBatchTicker(errChan chan error) { - for { - select { - case <-b.isClosed: - b.batchTicker.Stop() - err := b.FlushMessages() - if err != nil { - errChan <- err - } - case <-b.batchTicker.C: - err := b.FlushMessages() - if err != nil { - errChan <- err - } - } - } -} +func (b *producerBatch) StartBatch() { + go func() { + for { + select { + case <-b.isClosed: + b.batchTicker.Stop() + err := b.FlushMessages() + if err != nil { + b.errorLogger.Printf("Batch producer flush error %v", err) + } + case <-b.batchTicker.C: + err := b.FlushMessages() + if err != nil { + b.errorLogger.Printf("Batch producer flush error %v", err) + } -func (b *producerBatch) AddMessage(message kafka.Message) error { - b.messages = append(b.messages, message) - if len(b.messages) >= b.batchLimit { - err := b.FlushMessages() - if err != nil { - return err + case message := <-b.messageChn: + b.messages = append(b.messages, *message) + if len(b.messages) == b.batchLimit { + err := b.FlushMessages() + if err != nil { + b.errorLogger.Printf("Batch producer flush error %v", err) + } + } + } } - } - return nil + }() } func (b *producerBatch) FlushMessages() error { - b.flushMutex.Lock() - defer b.flushMutex.Unlock() - - messageCount := len(b.messages) + messageCount := len(b.messageChn) if messageCount == 0 { return nil }