Skip to content

Commit

Permalink
Get Kafka message from stack to prevent allocations (#47)
Browse files Browse the repository at this point in the history
* Get Kafka message from stack to prevent allocations

* Fix unnecessary topic name calculation
  • Loading branch information
mhmtszr authored May 4, 2023
1 parent 2a1ec1c commit c680606
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 40 deletions.
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
)

func mapper(event couchbase.Event) []*message.KafkaMessage {
func mapper(event couchbase.Event) []message.KafkaMessage {
// return empty if you wish filter the event
return []*message.KafkaMessage{
message.GetKafkaMessage(event.Key, event.Value, nil),
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
}

Expand Down Expand Up @@ -78,10 +82,14 @@ import (
"os"
)

func mapper(event couchbase.Event) []*message.KafkaMessage {
func mapper(event couchbase.Event) []message.KafkaMessage {
// return empty if you wish filter the event
return []*message.KafkaMessage{
message.GetKafkaMessage(event.Key, event.Value, nil),
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
}

Expand Down
20 changes: 8 additions & 12 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/Trendyol/go-kafka-connect-couchbase/couchbase"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/metadata"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/producer"
"github.com/Trendyol/go-kafka-connect-couchbase/metric"
Expand Down Expand Up @@ -53,17 +52,14 @@ func (c *connector) produce(ctx *models.ListenerContext) {
default:
return
}

for _, kafkaMessage := range c.mapper(e) {
if kafkaMessage != nil {
topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", e.CollectionName)
return
}
c.producer.Produce(ctx, e.EventTime, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic)
message.MessagePool.Put(kafkaMessage)
}
topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", e.CollectionName)
return
}
kafkaMessages := c.mapper(e)
for i := range kafkaMessages {
c.producer.Produce(ctx, e.EventTime, kafkaMessages[i].Value, kafkaMessages[i].Key, kafkaMessages[i].Headers, topic)
}
}

Expand Down
10 changes: 7 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
)

func mapper(event couchbase.Event) []*message.KafkaMessage {
func mapper(event couchbase.Event) []message.KafkaMessage {
// return empty if you wish filter the event
return []*message.KafkaMessage{
message.GetKafkaMessage(event.Key, event.Value, nil),
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
}

Expand Down
18 changes: 0 additions & 18 deletions kafka/message/message.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,7 @@
package message

import (
"sync"
)

type KafkaMessage struct {
Headers map[string]string
Key []byte
Value []byte
}

func GetKafkaMessage(key []byte, value []byte, headers map[string]string) *KafkaMessage {
message := MessagePool.Get().(*KafkaMessage)
message.Key = key
message.Value = value
message.Headers = headers
return message
}

var MessagePool = sync.Pool{
New: func() any {
return &KafkaMessage{}
},
}
2 changes: 1 addition & 1 deletion mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
)

type Mapper func(event couchbase.Event) []*message.KafkaMessage
type Mapper func(event couchbase.Event) []message.KafkaMessage

0 comments on commit c680606

Please sign in to comment.