From f2b36e38ef73b362137e07d6a8ef4949f867ea5b Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 20 Jul 2023 16:46:55 +0300 Subject: [PATCH] Add dcp event handler implementation --- connector.go | 4 ++++ dcp_event_handler.go | 33 +++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 ++-- kafka/producer/producer.go | 33 ++++++++++++----------------- kafka/producer/producer_batch.go | 36 +++++++++++++++++++++++++------- 6 files changed, 81 insertions(+), 31 deletions(-) create mode 100644 dcp_event_handler.go diff --git a/connector.go b/connector.go index 9e8f088..265dcda 100644 --- a/connector.go +++ b/connector.go @@ -136,6 +136,10 @@ func NewConnector(cfg any, mapper Mapper) (Connector, error) { return nil, err } + connector.dcp.SetEventHandler(&DcpEventHandler{ + producerBatch: connector.producer.ProducerBatch, + }) + initializeMetricCollector(connector, dcpClient) return connector, nil diff --git a/dcp_event_handler.go b/dcp_event_handler.go new file mode 100644 index 0000000..c1626b7 --- /dev/null +++ b/dcp_event_handler.go @@ -0,0 +1,33 @@ +package dcpkafka + +import "github.com/Trendyol/go-dcp-kafka/kafka/producer" + +type DcpEventHandler struct { + producerBatch *producer.Batch +} + +func (h *DcpEventHandler) BeforeRebalanceStart() { +} + +func (h *DcpEventHandler) AfterRebalanceStart() { +} + +func (h *DcpEventHandler) BeforeRebalanceEnd() { +} + +func (h *DcpEventHandler) AfterRebalanceEnd() { +} + +func (h *DcpEventHandler) BeforeStreamStart() { + h.producerBatch.PrepareEndRebalancing() +} + +func (h *DcpEventHandler) AfterStreamStart() { +} + +func (h *DcpEventHandler) BeforeStreamStop() { + h.producerBatch.PrepareStartRebalancing() +} + +func (h *DcpEventHandler) AfterStreamStop() { +} diff --git a/go.mod b/go.mod index 8071b23..caea39b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/go-dcp-kafka go 1.19 require ( - github.com/Trendyol/go-dcp v0.0.66 + github.com/Trendyol/go-dcp v0.0.67 github.com/json-iterator/go v1.1.12 github.com/prometheus/client_golang v1.15.1 github.com/segmentio/kafka-go v0.4.42 diff --git a/go.sum b/go.sum index 0cef4fe..a3a2448 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= -github.com/Trendyol/go-dcp v0.0.66 h1:+BGUxnXI2pA1BFhF0IsmnG+Y9gYidDvKXgNwcieS7kU= -github.com/Trendyol/go-dcp v0.0.66/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.67 h1:JW4H/BXJmx67IPSqcOqqD7xBQbJhol+W7zhzWfp0f6U= +github.com/Trendyol/go-dcp v0.0.67/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index a0d3f62..6ca7925 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -10,20 +10,13 @@ import ( "github.com/segmentio/kafka-go" ) -type Producer interface { - Produce(ctx *models.ListenerContext, eventTime time.Time, messages []kafka.Message) - Close() error - GetMetric() *Metric - StartBatch() -} - type Metric struct { KafkaConnectorLatency int64 BatchProduceLatency int64 } -type producer struct { - producerBatch *producerBatch +type Producer struct { + ProducerBatch *Batch } func NewProducer(kafkaClient gKafka.Client, @@ -34,8 +27,8 @@ func NewProducer(kafkaClient gKafka.Client, ) (Producer, error) { writer := kafkaClient.Producer() - return &producer{ - producerBatch: newProducerBatch( + return Producer{ + ProducerBatch: newBatch( config.Kafka.ProducerBatchTickerDuration, writer, config.Kafka.ProducerBatchSize, @@ -47,23 +40,23 @@ func NewProducer(kafkaClient gKafka.Client, }, nil } -func (p *producer) StartBatch() { - p.producerBatch.StartBatchTicker() +func (p *Producer) StartBatch() { + p.ProducerBatch.StartBatchTicker() } -func (p *producer) Produce( +func (p *Producer) Produce( ctx *models.ListenerContext, eventTime time.Time, messages []kafka.Message, ) { - p.producerBatch.AddMessages(ctx, messages, eventTime) + p.ProducerBatch.AddMessages(ctx, messages, eventTime) } -func (p *producer) Close() error { - p.producerBatch.Close() - return p.producerBatch.Writer.Close() +func (p *Producer) Close() error { + p.ProducerBatch.Close() + return p.ProducerBatch.Writer.Close() } -func (p *producer) GetMetric() *Metric { - return p.producerBatch.metric +func (p *Producer) GetMetric() *Metric { + return p.ProducerBatch.metric } diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index bececd3..3d35b1b 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -15,7 +15,7 @@ import ( "github.com/segmentio/kafka-go" ) -type producerBatch struct { +type Batch struct { logger logger.Logger errorLogger logger.Logger batchTicker *time.Ticker @@ -28,9 +28,10 @@ type producerBatch struct { batchLimit int batchBytes int flushLock sync.Mutex + isDcpRebalancing bool } -func newProducerBatch( +func newBatch( batchTime time.Duration, writer *kafka.Writer, batchLimit int, @@ -38,8 +39,8 @@ func newProducerBatch( logger logger.Logger, errorLogger logger.Logger, dcpCheckpointCommit func(), -) *producerBatch { - batch := &producerBatch{ +) *Batch { + batch := &Batch{ batchTickerDuration: batchTime, batchTicker: time.NewTicker(batchTime), metric: &Metric{}, @@ -54,7 +55,7 @@ func newProducerBatch( return batch } -func (b *producerBatch) StartBatchTicker() { +func (b *Batch) StartBatchTicker() { go func() { for { <-b.batchTicker.C @@ -63,13 +64,32 @@ func (b *producerBatch) StartBatchTicker() { }() } -func (b *producerBatch) Close() { +func (b *Batch) Close() { b.batchTicker.Stop() b.FlushMessages() } -func (b *producerBatch) AddMessages(ctx *models.ListenerContext, messages []kafka.Message, eventTime time.Time) { +func (b *Batch) PrepareStartRebalancing() { b.flushLock.Lock() + defer b.flushLock.Unlock() + + b.isDcpRebalancing = true + b.messages = b.messages[:0] + b.currentMessageBytes = 0 +} + +func (b *Batch) PrepareEndRebalancing() { + b.flushLock.Lock() + defer b.flushLock.Unlock() + + b.isDcpRebalancing = false +} + +func (b *Batch) AddMessages(ctx *models.ListenerContext, messages []kafka.Message, eventTime time.Time) { + b.flushLock.Lock() + if b.isDcpRebalancing { + return + } b.messages = append(b.messages, messages...) b.currentMessageBytes += binary.Size(messages) ctx.Ack() @@ -82,7 +102,7 @@ func (b *producerBatch) AddMessages(ctx *models.ListenerContext, messages []kafk } } -func (b *producerBatch) FlushMessages() { +func (b *Batch) FlushMessages() { b.flushLock.Lock() defer b.flushLock.Unlock() if len(b.messages) > 0 {