Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add sinkResponseHandler #81

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *connector) getTopicName(collectionName string, messageTopic string) str
return topic
}

func newConnector(cfg any, mapper Mapper) (Connector, error) {
func newConnector(cfg any, mapper Mapper, sinkResponseHandler kafka.SinkResponseHandler) (Connector, error) {
c, err := newConfig(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -128,7 +128,8 @@ func newConnector(cfg any, mapper Mapper) (Connector, error) {

connector.dcp = dcpClient

connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit)
connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit, sinkResponseHandler)

if err != nil {
logger.Log.Error("kafka error: %v", err)
return nil, err
Expand Down Expand Up @@ -200,14 +201,16 @@ func newConnectorConfigFromPath(path string) (*config.Connector, error) {
}

type ConnectorBuilder struct {
mapper Mapper
config any
mapper Mapper
config any
sinkResponseHandler kafka.SinkResponseHandler
}

func NewConnectorBuilder(config any) *ConnectorBuilder {
return &ConnectorBuilder{
config: config,
mapper: DefaultMapper,
config: config,
mapper: DefaultMapper,
sinkResponseHandler: nil,
}
}

Expand All @@ -216,8 +219,13 @@ func (c *ConnectorBuilder) SetMapper(mapper Mapper) *ConnectorBuilder {
return c
}

func (c *ConnectorBuilder) SetSinkResponseHandler(sinkResponseHandler kafka.SinkResponseHandler) *ConnectorBuilder {
c.sinkResponseHandler = sinkResponseHandler
return c
}

func (c *ConnectorBuilder) Build() (Connector, error) {
return newConnector(c.config, c.mapper)
return newConnector(c.config, c.mapper, c.sinkResponseHandler)
}

func (c *ConnectorBuilder) SetLogger(l *logrus.Logger) *ConnectorBuilder {
Expand Down
30 changes: 26 additions & 4 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,43 @@ func mapper(event couchbase.Event) []message.KafkaMessage {
}
```

## Step 3: Configuring the Connector
## Step 3: Implementing the SinkResponseHandler

This function is called after the event is published and takes `message.KafkaMessage` as a parameter.
Here's an example SinkResponseHandler implementation:

```go
type sinkResponseHandler struct {
}

func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value))
}

func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnError %v\n", string(ctx.Message.Value))
}
```

## Step 4: Configuring the Connector

The configuration for the connector is provided via a YAML file. Here's an example [configuration](https://github.com/Trendyol/go-dcp-kafka/blob/master/example/config.yml):

You can find explanation of [configurations](https://github.com/Trendyol/go-dcp#configuration)

You can pass this configuration file to the connector by providing the path to the file when creating the connector:
```go
connector, err := dcpkafka.NewConnector("path-to-config.yml", mapper)
connector, err := dcpkafka.NewConnectorBuilder("config.yml").
SetMapper(mapper).
SetSinkResponseHandler(&sinkResponseHandler{}). // if you want to add callback func
Build()

if err != nil {
panic(err)
panic(err)
}
```

## Step 4: Starting and Closing the Connector
## Step 5: Starting and Closing the Connector

Once you have implemented the mapper and configured the connector, you can start/stop the connector:

Expand Down
20 changes: 18 additions & 2 deletions example/simple/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"github.com/Trendyol/go-dcp-kafka"
"fmt"
dcpkafka "github.com/Trendyol/go-dcp-kafka"
"github.com/Trendyol/go-dcp-kafka/couchbase"
"github.com/Trendyol/go-dcp-kafka/kafka"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

Expand All @@ -17,8 +19,22 @@ func mapper(event couchbase.Event) []message.KafkaMessage {
}
}

type sinkResponseHandler struct {
}

func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value))
}

func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnError %v\n", string(ctx.Message.Value))
}

func main() {
c, err := dcpkafka.NewConnectorBuilder("config.yml").SetMapper(mapper).Build()
c, err := dcpkafka.NewConnectorBuilder("config.yml").
SetMapper(mapper).
SetSinkResponseHandler(&sinkResponseHandler{}).
Build()
if err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Producer struct {
func NewProducer(kafkaClient gKafka.Client,
config *config.Connector,
dcpCheckpointCommit func(),
sinkResponseHandler gKafka.SinkResponseHandler,
) (Producer, error) {
writer := kafkaClient.Producer()

Expand All @@ -33,6 +34,7 @@ func NewProducer(kafkaClient gKafka.Client,
config.Kafka.ProducerBatchSize,
int64(helpers.ResolveUnionIntOrStringValue(config.Kafka.ProducerBatchBytes)),
dcpCheckpointCommit,
sinkResponseHandler,
),
}, nil
}
Expand Down
64 changes: 63 additions & 1 deletion kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"syscall"
"time"

gKafka "github.com/Trendyol/go-dcp-kafka/kafka"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
"github.com/Trendyol/go-dcp/logger"

"github.com/Trendyol/go-dcp/models"
"github.com/segmentio/kafka-go"
)

type Batch struct {
sinkResponseHandler gKafka.SinkResponseHandler
batchTicker *time.Ticker
Writer *kafka.Writer
dcpCheckpointCommit func()
Expand All @@ -36,6 +39,7 @@ func newBatch(
batchLimit int,
batchBytes int64,
dcpCheckpointCommit func(),
sinkResponseHandler gKafka.SinkResponseHandler,
) *Batch {
batch := &Batch{
batchTickerDuration: batchTime,
Expand All @@ -46,6 +50,7 @@ func newBatch(
batchLimit: batchLimit,
dcpCheckpointCommit: dcpCheckpointCommit,
batchBytes: batchBytes,
sinkResponseHandler: sinkResponseHandler,
}
return batch
}
Expand Down Expand Up @@ -108,15 +113,31 @@ func (b *Batch) FlushMessages() {
if len(b.messages) > 0 {
startedTime := time.Now()
err := b.Writer.WriteMessages(context.Background(), b.messages...)
if err != nil {

if err != nil && b.sinkResponseHandler == nil {
if isFatalError(err) {
erayarslan marked this conversation as resolved.
Show resolved Hide resolved
panic(fmt.Errorf("permanent error on Kafka side %v", err))
}
logger.Log.Error("batch producer flush error %v", err)
return
}

b.metric.BatchProduceLatency = time.Since(startedTime).Milliseconds()

if b.sinkResponseHandler != nil {
switch e := err.(type) {
case nil:
b.handleResponseSuccess()
case kafka.WriteErrors:
b.handleWriteError(e)
case kafka.MessageTooLargeError:
b.handleMessageTooLargeError(e)
return
default:
logger.Log.Error("batch producer flush error %v", err)
return
}
}
b.messages = b.messages[:0]
b.currentMessageBytes = 0
b.batchTicker.Reset(b.batchTickerDuration)
Expand All @@ -136,3 +157,44 @@ func isFatalError(err error) bool {
}
return true
}

func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) {
for i := range writeErrors {
if writeErrors[i] != nil {
b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(b.messages[i]),
Err: writeErrors[i],
})
} else {
b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(b.messages[i]),
Err: nil,
})
}
}
}

func (b *Batch) handleResponseSuccess() {
for _, msg := range b.messages {
b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(msg),
Err: nil,
})
}
}

func (b *Batch) handleMessageTooLargeError(mTooLargeError kafka.MessageTooLargeError) {
b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(mTooLargeError.Message),
Err: mTooLargeError,
})
}

func convertKafkaMessage(src kafka.Message) *message.KafkaMessage {
return &message.KafkaMessage{
Topic: src.Topic,
Headers: src.Headers,
Key: src.Key,
Value: src.Value,
}
}
15 changes: 15 additions & 0 deletions kafka/sink_response_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kafka

import (
"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

type SinkResponseHandlerContext struct {
Message *message.KafkaMessage
Err error
}

type SinkResponseHandler interface {
OnSuccess(ctx *SinkResponseHandlerContext)
OnError(ctx *SinkResponseHandlerContext)
}