diff --git a/config/config.go b/config/config.go index 02467f8..0f54232 100644 --- a/config/config.go +++ b/config/config.go @@ -4,10 +4,13 @@ import ( "math" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp/config" ) type Kafka struct { + ProducerBatchBytes any `yaml:"producerBatchBytes"` CollectionTopicMapping map[string]string `yaml:"collectionTopicMapping"` InterCAPath string `yaml:"interCAPath"` ScramUsername string `yaml:"scramUsername"` @@ -16,9 +19,8 @@ type Kafka struct { ClientID string `yaml:"clientID"` Brokers []string `yaml:"brokers"` MetadataTopics []string `yaml:"metadataTopics"` - ProducerBatchBytes int64 `yaml:"producerBatchBytes"` - ProducerBatchTimeout time.Duration `yaml:"producerBatchTimeout"` ProducerMaxAttempts int `yaml:"producerMaxAttempts"` + ProducerBatchTimeout time.Duration `yaml:"producerBatchTimeout"` ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` RequiredAcks int `yaml:"requiredAcks"` @@ -60,7 +62,7 @@ func (c *Connector) ApplyDefaults() { } if c.Kafka.ProducerBatchBytes == 0 { - c.Kafka.ProducerBatchBytes = 10485760 + c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("10mb") } if c.Kafka.RequiredAcks == 0 { diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index 0a40dac..71e2f57 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -3,6 +3,8 @@ package producer import ( "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp-kafka/config" gKafka "github.com/Trendyol/go-dcp-kafka/kafka" "github.com/Trendyol/go-dcp/models" @@ -29,7 +31,7 @@ func NewProducer(kafkaClient gKafka.Client, config.Kafka.ProducerBatchTickerDuration, writer, config.Kafka.ProducerBatchSize, - config.Kafka.ProducerBatchBytes, + int64(helpers.ResolveUnionIntOrStringValue(config.Kafka.ProducerBatchBytes)), dcpCheckpointCommit, ), }, nil