diff --git a/pumps/kafka.go b/pumps/kafka.go index a133068a2..ad2756a99 100644 --- a/pumps/kafka.go +++ b/pumps/kafka.go @@ -4,6 +4,8 @@ import ( "context" "crypto/tls" "encoding/json" + "os" + "strconv" "time" "github.com/TykTechnologies/tyk-pump/analytics" @@ -39,8 +41,8 @@ type KafkaConf struct { ClientId string `json:"client_id" mapstructure:"client_id"` // The topic that the writer will produce messages to. Topic string `json:"topic" mapstructure:"topic"` - // Timeout is the maximum amount of time will wait for a connect or write to complete. - Timeout time.Duration `json:"timeout" mapstructure:"timeout"` + // Timeout is the maximum amount of seconds to wait for a connect or write to complete. + Timeout interface{} `json:"timeout" mapstructure:"timeout"` // Enable "github.com/golang/snappy" codec to be used to compress Kafka messages. By default // is `false`. Compressed bool `json:"compressed" mapstructure:"compressed"` @@ -90,6 +92,10 @@ func (k *KafkaPump) Init(config interface{}) error { } processPumpEnvVars(k, k.log, k.kafkaConf, kafkaDefaultENV) + // This interface field is not reached by envconfig library, that's why we manually check it + if os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT") != "" { + k.kafkaConf.Timeout = os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT") + } var tlsConfig *tls.Config if k.kafkaConf.UseSSL { @@ -137,9 +143,26 @@ func (k *KafkaPump) Init(config interface{}) error { k.log.WithField("SASL-Mechanism", k.kafkaConf.SASLMechanism).Warn("Tyk pump doesn't support this SASL mechanism.") } + // Timeout is an interface type to allow both time.Duration and float values + var timeout time.Duration + switch v := k.kafkaConf.Timeout.(type) { + case string: + timeout, err = time.ParseDuration(v) // i.e: when timeout is '1s' + if err != nil { + floatValue, floatErr := strconv.ParseFloat(v, 64) // i.e: when timeout is '1' + if floatErr != nil { + k.log.Fatal("Failed to parse timeout: ", floatErr) + } else { + timeout = time.Duration(floatValue * float64(time.Second)) + } + } + case float64: + timeout = time.Duration(v) * time.Second // i.e: when timeout is 1 + } + //Kafka writer connection config dialer := &kafka.Dialer{ - Timeout: k.kafkaConf.Timeout * time.Second, + Timeout: timeout, ClientID: k.kafkaConf.ClientId, TLS: tlsConfig, SASLMechanism: mechanism, @@ -149,8 +172,8 @@ func (k *KafkaPump) Init(config interface{}) error { k.writerConfig.Topic = k.kafkaConf.Topic k.writerConfig.Balancer = &kafka.LeastBytes{} k.writerConfig.Dialer = dialer - k.writerConfig.WriteTimeout = k.kafkaConf.Timeout * time.Second - k.writerConfig.ReadTimeout = k.kafkaConf.Timeout * time.Second + k.writerConfig.WriteTimeout = timeout + k.writerConfig.ReadTimeout = timeout if k.kafkaConf.Compressed { k.writerConfig.CompressionCodec = snappy.NewCompressionCodec() }