Skip to content

Commit

Permalink
Update product batch to prevent event missing
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Jan 6, 2023
1 parent 682a5f2 commit dec1325
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 54 deletions.
11 changes: 2 additions & 9 deletions connector.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package gokafkaconnectcouchbase

import (
"context"

"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"

godcpclient "github.com/Trendyol/go-dcp-client"
Expand Down Expand Up @@ -57,13 +55,8 @@ func (c *connector) listener(event interface{}, err error) {
}

if kafkaMessage := c.mapper(e); kafkaMessage != nil {
defer message.KafkaMessagePool.Put(kafkaMessage)
// TODO: use contexts
ctx := context.TODO()
err = c.producer.Produce(&ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers)
if err != nil {
c.errorLogger.Printf("error | %v", err)
}
defer message.MessagePool.Put(kafkaMessage)
c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers)
}
}

Expand Down
4 changes: 2 additions & 2 deletions kafka/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ type KafkaMessage struct {
}

func GetKafkaMessage(key []byte, value []byte, headers map[string]string) *KafkaMessage {
message := KafkaMessagePool.Get().(*KafkaMessage)
message := MessagePool.Get().(*KafkaMessage)
message.Key = key
message.Value = value
message.Headers = headers
return message
}

var KafkaMessagePool = sync.Pool{
var MessagePool = sync.Pool{
New: func() any {
return &KafkaMessage{}
},
Expand Down
20 changes: 16 additions & 4 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package kafka

import (
"context"
"math"
"sync"
"time"

"github.com/Trendyol/go-kafka-connect-couchbase/config"
Expand All @@ -12,7 +12,7 @@ import (
)

type Producer interface {
Produce(ctx *context.Context, message []byte, key []byte, headers map[string]string) error
Produce(message []byte, key []byte, headers map[string]string)
Close() error
}

Expand All @@ -25,6 +25,8 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.
Topic: config.Topic,
Addr: kafka.TCP(config.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: config.ProducerBatchSize,
BatchBytes: math.MaxInt,
MaxAttempts: math.MaxInt,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand All @@ -37,8 +39,18 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.
}
}

func (a *producer) Produce(ctx *context.Context, message []byte, key []byte, headers map[string]string) error {
return a.producerBatch.AddMessage(kafka.Message{Key: key, Value: message, Headers: newHeaders(headers)})
var KafkaMessagePool = sync.Pool{
New: func() any {
return &kafka.Message{}
},
}

func (a *producer) Produce(message []byte, key []byte, headers map[string]string) {
msg := KafkaMessagePool.Get().(*kafka.Message)
msg.Key = key
msg.Value = message
msg.Headers = newHeaders(headers)
a.producerBatch.messageChn <- msg
}

func newHeaders(headersMap map[string]string) []kafka.Header {
Expand Down
68 changes: 29 additions & 39 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"sync"
"time"

"github.com/Trendyol/go-kafka-connect-couchbase/logger"
Expand All @@ -16,10 +15,10 @@ type producerBatch struct {
batchTicker *time.Ticker
Writer *kafka.Writer
isClosed chan bool
messageChn chan *kafka.Message
messages []kafka.Message
batchTickerDuration time.Duration
batchLimit int
flushMutex sync.Mutex
}

func newProducerBatch(
Expand All @@ -32,58 +31,49 @@ func newProducerBatch(
batch := &producerBatch{
batchTickerDuration: batchTime,
batchTicker: time.NewTicker(batchTime),
messageChn: make(chan *kafka.Message, batchLimit),
messages: make([]kafka.Message, 0, batchLimit),
Writer: writer,
batchLimit: batchLimit,
isClosed: make(chan bool, 1),
logger: logger,
errorLogger: errorLogger,
}
go func() {
errChan := make(chan error, 1)
batch.CheckBatchTicker(errChan)

for err := range errChan {
errorLogger.Printf("Batch producer flush error %v", err)
}
}()
batch.StartBatch()
return batch
}

func (b *producerBatch) CheckBatchTicker(errChan chan error) {
for {
select {
case <-b.isClosed:
b.batchTicker.Stop()
err := b.FlushMessages()
if err != nil {
errChan <- err
}
case <-b.batchTicker.C:
err := b.FlushMessages()
if err != nil {
errChan <- err
}
}
}
}
func (b *producerBatch) StartBatch() {
go func() {
for {
select {
case <-b.isClosed:
b.batchTicker.Stop()
err := b.FlushMessages()
if err != nil {
b.errorLogger.Printf("Batch producer flush error %v", err)
}
case <-b.batchTicker.C:
err := b.FlushMessages()
if err != nil {
b.errorLogger.Printf("Batch producer flush error %v", err)
}

func (b *producerBatch) AddMessage(message kafka.Message) error {
b.messages = append(b.messages, message)
if len(b.messages) >= b.batchLimit {
err := b.FlushMessages()
if err != nil {
return err
case message := <-b.messageChn:
b.messages = append(b.messages, *message)
if len(b.messages) == b.batchLimit {
err := b.FlushMessages()
if err != nil {
b.errorLogger.Printf("Batch producer flush error %v", err)
}
}
}
}
}
return nil
}()
}

func (b *producerBatch) FlushMessages() error {
b.flushMutex.Lock()
defer b.flushMutex.Unlock()

messageCount := len(b.messages)
messageCount := len(b.messageChn)
if messageCount == 0 {
return nil
}
Expand Down

0 comments on commit dec1325

Please sign in to comment.