diff --git a/pkg/sinks/kafka.go b/pkg/sinks/kafka.go index 96e74d39..cb7907fe 100644 --- a/pkg/sinks/kafka.go +++ b/pkg/sinks/kafka.go @@ -19,6 +19,7 @@ type KafkaConfig struct { Layout map[string]interface{} `yaml:"layout"` ClientId string `yaml:"clientId"` CompressionCodec string `yaml:"compressionCodec" default:"none"` + Version string `yaml:"version"` TLS struct { Enable bool `yaml:"enable"` CaFile string `yaml:"caFile"` @@ -126,7 +127,15 @@ func (k *KafkaSink) Close() { func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) { // Default Sarama config saramaConfig := sarama.NewConfig() - saramaConfig.Version = sarama.MaxVersion + if cfg.Version != "" { + version, err := sarama.ParseKafkaVersion(cfg.Version) + if err != nil { + return nil, err + } + saramaConfig.Version = version + } else { + saramaConfig.Version = sarama.MaxVersion + } saramaConfig.Metadata.Full = true saramaConfig.ClientID = cfg.ClientId