Skip to content

Commit

Permalink
Add dcp event handler implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Jul 20, 2023
1 parent 98505e1 commit f2b36e3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 31 deletions.
4 changes: 4 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func NewConnector(cfg any, mapper Mapper) (Connector, error) {
return nil, err
}

connector.dcp.SetEventHandler(&DcpEventHandler{
producerBatch: connector.producer.ProducerBatch,
})

initializeMetricCollector(connector, dcpClient)

return connector, nil
Expand Down
33 changes: 33 additions & 0 deletions dcp_event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dcpkafka

import "github.com/Trendyol/go-dcp-kafka/kafka/producer"

type DcpEventHandler struct {
producerBatch *producer.Batch
}

func (h *DcpEventHandler) BeforeRebalanceStart() {
}

func (h *DcpEventHandler) AfterRebalanceStart() {
}

func (h *DcpEventHandler) BeforeRebalanceEnd() {
}

func (h *DcpEventHandler) AfterRebalanceEnd() {
}

func (h *DcpEventHandler) BeforeStreamStart() {
h.producerBatch.PrepareEndRebalancing()
}

func (h *DcpEventHandler) AfterStreamStart() {
}

func (h *DcpEventHandler) BeforeStreamStop() {
h.producerBatch.PrepareStartRebalancing()
}

func (h *DcpEventHandler) AfterStreamStop() {
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/go-dcp-kafka
go 1.19

require (
github.com/Trendyol/go-dcp v0.0.66
github.com/Trendyol/go-dcp v0.0.67
github.com/json-iterator/go v1.1.12
github.com/prometheus/client_golang v1.15.1
github.com/segmentio/kafka-go v0.4.42
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Trendyol/go-dcp v0.0.66 h1:+BGUxnXI2pA1BFhF0IsmnG+Y9gYidDvKXgNwcieS7kU=
github.com/Trendyol/go-dcp v0.0.66/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/Trendyol/go-dcp v0.0.67 h1:JW4H/BXJmx67IPSqcOqqD7xBQbJhol+W7zhzWfp0f6U=
github.com/Trendyol/go-dcp v0.0.67/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
33 changes: 13 additions & 20 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@ import (
"github.com/segmentio/kafka-go"
)

type Producer interface {
Produce(ctx *models.ListenerContext, eventTime time.Time, messages []kafka.Message)
Close() error
GetMetric() *Metric
StartBatch()
}

type Metric struct {
KafkaConnectorLatency int64
BatchProduceLatency int64
}

type producer struct {
producerBatch *producerBatch
type Producer struct {
ProducerBatch *Batch
}

func NewProducer(kafkaClient gKafka.Client,
Expand All @@ -34,8 +27,8 @@ func NewProducer(kafkaClient gKafka.Client,
) (Producer, error) {
writer := kafkaClient.Producer()

return &producer{
producerBatch: newProducerBatch(
return Producer{
ProducerBatch: newBatch(
config.Kafka.ProducerBatchTickerDuration,
writer,
config.Kafka.ProducerBatchSize,
Expand All @@ -47,23 +40,23 @@ func NewProducer(kafkaClient gKafka.Client,
}, nil
}

func (p *producer) StartBatch() {
p.producerBatch.StartBatchTicker()
func (p *Producer) StartBatch() {
p.ProducerBatch.StartBatchTicker()
}

func (p *producer) Produce(
func (p *Producer) Produce(
ctx *models.ListenerContext,
eventTime time.Time,
messages []kafka.Message,
) {
p.producerBatch.AddMessages(ctx, messages, eventTime)
p.ProducerBatch.AddMessages(ctx, messages, eventTime)
}

func (p *producer) Close() error {
p.producerBatch.Close()
return p.producerBatch.Writer.Close()
func (p *Producer) Close() error {
p.ProducerBatch.Close()
return p.ProducerBatch.Writer.Close()
}

func (p *producer) GetMetric() *Metric {
return p.producerBatch.metric
func (p *Producer) GetMetric() *Metric {
return p.ProducerBatch.metric
}
36 changes: 28 additions & 8 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/segmentio/kafka-go"
)

type producerBatch struct {
type Batch struct {
logger logger.Logger
errorLogger logger.Logger
batchTicker *time.Ticker
Expand All @@ -28,18 +28,19 @@ type producerBatch struct {
batchLimit int
batchBytes int
flushLock sync.Mutex
isDcpRebalancing bool
}

func newProducerBatch(
func newBatch(
batchTime time.Duration,
writer *kafka.Writer,
batchLimit int,
batchBytes int,
logger logger.Logger,
errorLogger logger.Logger,
dcpCheckpointCommit func(),
) *producerBatch {
batch := &producerBatch{
) *Batch {
batch := &Batch{
batchTickerDuration: batchTime,
batchTicker: time.NewTicker(batchTime),
metric: &Metric{},
Expand All @@ -54,7 +55,7 @@ func newProducerBatch(
return batch
}

func (b *producerBatch) StartBatchTicker() {
func (b *Batch) StartBatchTicker() {
go func() {
for {
<-b.batchTicker.C
Expand All @@ -63,13 +64,32 @@ func (b *producerBatch) StartBatchTicker() {
}()
}

func (b *producerBatch) Close() {
func (b *Batch) Close() {
b.batchTicker.Stop()
b.FlushMessages()
}

func (b *producerBatch) AddMessages(ctx *models.ListenerContext, messages []kafka.Message, eventTime time.Time) {
func (b *Batch) PrepareStartRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()

b.isDcpRebalancing = true
b.messages = b.messages[:0]
b.currentMessageBytes = 0
}

func (b *Batch) PrepareEndRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()

b.isDcpRebalancing = false
}

func (b *Batch) AddMessages(ctx *models.ListenerContext, messages []kafka.Message, eventTime time.Time) {
b.flushLock.Lock()
if b.isDcpRebalancing {
return
}
b.messages = append(b.messages, messages...)
b.currentMessageBytes += binary.Size(messages)
ctx.Ack()
Expand All @@ -82,7 +102,7 @@ func (b *producerBatch) AddMessages(ctx *models.ListenerContext, messages []kafk
}
}

func (b *producerBatch) FlushMessages() {
func (b *Batch) FlushMessages() {
b.flushLock.Lock()
defer b.flushLock.Unlock()
if len(b.messages) > 0 {
Expand Down

0 comments on commit f2b36e3

Please sign in to comment.