From c1455c01fb61c230b2f7d59369ea323e458b7693 Mon Sep 17 00:00:00 2001 From: bysph Date: Tue, 21 Nov 2023 21:30:45 +0800 Subject: [PATCH] fix: support to set kafka version (#146) Co-authored-by: sph --- pkg/sinks/kafka.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/sinks/kafka.go b/pkg/sinks/kafka.go index 96e74d39..cb7907fe 100644 --- a/pkg/sinks/kafka.go +++ b/pkg/sinks/kafka.go @@ -19,6 +19,7 @@ type KafkaConfig struct { Layout map[string]interface{} `yaml:"layout"` ClientId string `yaml:"clientId"` CompressionCodec string `yaml:"compressionCodec" default:"none"` + Version string `yaml:"version"` TLS struct { Enable bool `yaml:"enable"` CaFile string `yaml:"caFile"` @@ -126,7 +127,15 @@ func (k *KafkaSink) Close() { func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) { // Default Sarama config saramaConfig := sarama.NewConfig() - saramaConfig.Version = sarama.MaxVersion + if cfg.Version != "" { + version, err := sarama.ParseKafkaVersion(cfg.Version) + if err != nil { + return nil, err + } + saramaConfig.Version = version + } else { + saramaConfig.Version = sarama.MaxVersion + } saramaConfig.Metadata.Full = true saramaConfig.ClientID = cfg.ClientId