diff --git a/mq/config.go b/mq/config.go index 3b6a9d3..e1c9246 100644 --- a/mq/config.go +++ b/mq/config.go @@ -58,9 +58,11 @@ func (c ConfigerType) String() string { const ( defaultTimeout = 3 * time.Second - defaultTTR = 3600 // 1 hour - defaultTTL = 3600 * 24 // 1 day - defaultTries = 1 + //默认1000毫秒 + defaultBatchTimeoutMs = 1000 + defaultTTR = 3600 // 1 hour + defaultTTL = 3600 * 24 // 1 day + defaultTries = 1 ) type Config struct { @@ -69,12 +71,14 @@ type Config struct { Topic string TimeOut time.Duration CommitInterval time.Duration - Offset int64 - OffsetAt string - TTR uint32 // time to run - TTL uint32 // time to live - Tries uint16 // delay tries - BatchSize int + // time interval to flush msg to broker default is 1 second + BatchTimeout time.Duration + Offset int64 + OffsetAt string + TTR uint32 // time to run + TTL uint32 // time to live + Tries uint16 // delay tries + BatchSize int } type KeyParts struct { @@ -184,14 +188,15 @@ func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent { } const ( - apolloConfigSep = "." - apolloBrokersSep = "," - apolloBrokersKey = "brokers" - apolloOffsetAtKey = "offsetat" - apolloTTRKey = "ttr" - apolloTTLKey = "ttl" - apolloTriesKey = "tries" - apolloBatchSizeKey = "batchsize" + apolloConfigSep = "." + apolloBrokersSep = "," + apolloBrokersKey = "brokers" + apolloOffsetAtKey = "offsetat" + apolloTTRKey = "ttr" + apolloTTLKey = "ttl" + apolloTriesKey = "tries" + apolloBatchSizeKey = "batchsize" + apolloBatchTimeoutMsKey = "batchtimeoutms" ) type ApolloConfig struct { @@ -314,6 +319,15 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp } } slog.Infof(ctx, "%s got config batchSize: %d", fun, batchSize) + batchTimeoutMs, ok := m.getConfigItemWithFallback(ctx, topic, apolloBatchTimeoutMsKey, mqType) + if !ok { + slog.Infof(ctx, "%s no batchTimeout config founds", fun) + } + batchTimeoutMsVal, err := strconv.ParseUint(batchTimeoutMs, 10, 32) + if err != nil { + batchTimeoutMsVal = defaultBatchTimeoutMs + } + slog.Infof(ctx, "%s got config batchTimeout:%d", fun, ttl) return &Config{ MQType: mqType, @@ -321,6 +335,7 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp Topic: topic, TimeOut: defaultTimeout, CommitInterval: 1 * time.Second, + BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond, Offset: FirstOffset, OffsetAt: offsetAtVal, TTR: uint32(ttr), diff --git a/mq/kafka.go b/mq/kafka.go index 707ef57..2e917dd 100644 --- a/mq/kafka.go +++ b/mq/kafka.go @@ -144,8 +144,13 @@ func NewKafkaWriter(brokers []string, topic string) *KafkaWriter { } // TODO should optimize this, too dumb, double get, reset batchsize config, _ := DefaultConfiger.GetConfig(context.TODO(), topic, MQTypeKafka) - if config != nil && config.BatchSize > defaultBatchSize { - kafkaConfig.BatchSize = config.BatchSize + if config != nil { + if config.BatchSize > defaultBatchSize { + kafkaConfig.BatchSize = config.BatchSize + } + if config.BatchTimeout > 0 { + kafkaConfig.BatchTimeout = config.BatchTimeout + } } writer := kafka.NewWriter(kafkaConfig) diff --git a/mq/kafka_test.go b/mq/kafka_test.go index 3d10d65..b41e222 100644 --- a/mq/kafka_test.go +++ b/mq/kafka_test.go @@ -2,6 +2,7 @@ package mq import ( "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -9,4 +10,5 @@ import ( func TestNewKafkaWriter(t *testing.T) { w := NewKafkaWriter([]string{"prod.kafka1.ibanyu.com:9092", "prod.kafka2.ibanyu.com:9092", "prod.kafka3.ibanyu.com:9092"}, "palfish.test.test") assert.Equal(t, 100, w.config.BatchSize) + assert.Equal(t, time.Duration(5000000), w.config.BatchTimeout) }