From c680606a2ecb82e8ff00e7323803aec7b1ab8561 Mon Sep 17 00:00:00 2001 From: Mehmet Sezer Date: Thu, 4 May 2023 12:26:39 +0300 Subject: [PATCH] Get Kafka message from stack to prevent allocations (#47) * Get Kafka message from stack to prevent allocations * Fix unnecessary topic name calculation --- README.md | 20 ++++++++++++++------ connector.go | 20 ++++++++------------ example/main.go | 10 +++++++--- kafka/message/message.go | 18 ------------------ mapper.go | 2 +- 5 files changed, 30 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 7425fec..0e6d406 100644 --- a/README.md +++ b/README.md @@ -47,10 +47,14 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" ) -func mapper(event couchbase.Event) []*message.KafkaMessage { +func mapper(event couchbase.Event) []message.KafkaMessage { // return empty if you wish filter the event - return []*message.KafkaMessage{ - message.GetKafkaMessage(event.Key, event.Value, nil), + return []message.KafkaMessage{ + { + Headers: nil, + Key: event.Key, + Value: event.Value, + }, } } @@ -78,10 +82,14 @@ import ( "os" ) -func mapper(event couchbase.Event) []*message.KafkaMessage { +func mapper(event couchbase.Event) []message.KafkaMessage { // return empty if you wish filter the event - return []*message.KafkaMessage{ - message.GetKafkaMessage(event.Key, event.Value, nil), + return []message.KafkaMessage{ + { + Headers: nil, + Key: event.Key, + Value: event.Value, + }, } } diff --git a/connector.go b/connector.go index 8bd0e32..74827c7 100644 --- a/connector.go +++ b/connector.go @@ -7,7 +7,6 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/config" "github.com/Trendyol/go-kafka-connect-couchbase/couchbase" "github.com/Trendyol/go-kafka-connect-couchbase/kafka" - "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" "github.com/Trendyol/go-kafka-connect-couchbase/kafka/metadata" "github.com/Trendyol/go-kafka-connect-couchbase/kafka/producer" "github.com/Trendyol/go-kafka-connect-couchbase/metric" @@ -53,17 +52,14 @@ func (c *connector) produce(ctx *models.ListenerContext) { default: return } - - for _, kafkaMessage := range c.mapper(e) { - if kafkaMessage != nil { - topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName] - if topic == "" { - c.errorLogger.Printf("unexpected collection | %s", e.CollectionName) - return - } - c.producer.Produce(ctx, e.EventTime, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic) - message.MessagePool.Put(kafkaMessage) - } + topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName] + if topic == "" { + c.errorLogger.Printf("unexpected collection | %s", e.CollectionName) + return + } + kafkaMessages := c.mapper(e) + for i := range kafkaMessages { + c.producer.Produce(ctx, e.EventTime, kafkaMessages[i].Value, kafkaMessages[i].Key, kafkaMessages[i].Headers, topic) } } diff --git a/example/main.go b/example/main.go index 67907cf..d20f347 100644 --- a/example/main.go +++ b/example/main.go @@ -6,10 +6,14 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" ) -func mapper(event couchbase.Event) []*message.KafkaMessage { +func mapper(event couchbase.Event) []message.KafkaMessage { // return empty if you wish filter the event - return []*message.KafkaMessage{ - message.GetKafkaMessage(event.Key, event.Value, nil), + return []message.KafkaMessage{ + { + Headers: nil, + Key: event.Key, + Value: event.Value, + }, } } diff --git a/kafka/message/message.go b/kafka/message/message.go index 228dba7..391261c 100644 --- a/kafka/message/message.go +++ b/kafka/message/message.go @@ -1,25 +1,7 @@ package message -import ( - "sync" -) - type KafkaMessage struct { Headers map[string]string Key []byte Value []byte } - -func GetKafkaMessage(key []byte, value []byte, headers map[string]string) *KafkaMessage { - message := MessagePool.Get().(*KafkaMessage) - message.Key = key - message.Value = value - message.Headers = headers - return message -} - -var MessagePool = sync.Pool{ - New: func() any { - return &KafkaMessage{} - }, -} diff --git a/mapper.go b/mapper.go index db076a0..449dd86 100644 --- a/mapper.go +++ b/mapper.go @@ -5,4 +5,4 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" ) -type Mapper func(event couchbase.Event) []*message.KafkaMessage +type Mapper func(event couchbase.Event) []message.KafkaMessage