diff --git a/mq/config.go b/mq/config.go index 0b89808..3b6a9d3 100644 --- a/mq/config.go +++ b/mq/config.go @@ -74,6 +74,7 @@ type Config struct { TTR uint32 // time to run TTL uint32 // time to live Tries uint16 // delay tries + BatchSize int } type KeyParts struct { @@ -183,13 +184,14 @@ func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent { } const ( - apolloConfigSep = "." - apolloBrokersSep = "," - apolloBrokersKey = "brokers" - apolloOffsetAtKey = "offsetat" - apolloTTRKey = "ttr" - apolloTTLKey = "ttl" - apolloTriesKey = "tries" + apolloConfigSep = "." + apolloBrokersSep = "," + apolloBrokersKey = "brokers" + apolloOffsetAtKey = "offsetat" + apolloTTRKey = "ttr" + apolloTTLKey = "ttl" + apolloTriesKey = "tries" + apolloBatchSizeKey = "batchsize" ) type ApolloConfig struct { @@ -298,6 +300,21 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp } slog.Infof(ctx, "%s got config triesVal:%s", fun, triesVal) + batchSize := defaultBatchSize + batchSizeVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloBatchSizeKey, mqType) + if !ok { + // do nothing + slog.Infof(ctx, "%s has no batchsize config", fun) + } else { + t, err := strconv.Atoi(batchSizeVal) + if err != nil { + slog.Errorf(ctx, "%s got invalid batchsize config, batchsize: %s", fun, batchSizeVal) + } else { + batchSize = t + } + } + slog.Infof(ctx, "%s got config batchSize: %d", fun, batchSize) + return &Config{ MQType: mqType, MQAddr: brokers, @@ -309,6 +326,7 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp TTR: uint32(ttr), TTL: uint32(ttl), Tries: uint16(tries), + BatchSize: batchSize, }, nil } diff --git a/mq/interface.go b/mq/global.go similarity index 100% rename from mq/interface.go rename to mq/global.go diff --git a/mq/kafka.go b/mq/kafka.go index 3c9d0b7..707ef57 100644 --- a/mq/kafka.go +++ b/mq/kafka.go @@ -15,6 +15,10 @@ import ( "time" ) +const ( + defaultBatchSize = 1 +) + type KafkaHandler struct { msg kafka.Message reader *kafka.Reader @@ -130,19 +134,24 @@ type KafkaWriter struct { } func NewKafkaWriter(brokers []string, topic string) *KafkaWriter { - config := kafka.WriterConfig{ + kafkaConfig := kafka.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: &kafka.Hash{}, - BatchSize: 1, + BatchSize: defaultBatchSize, //RequiredAcks: 1, //Async: true, } - writer := kafka.NewWriter(config) + // TODO should optimize this, too dumb, double get, reset batchsize + config, _ := DefaultConfiger.GetConfig(context.TODO(), topic, MQTypeKafka) + if config != nil && config.BatchSize > defaultBatchSize { + kafkaConfig.BatchSize = config.BatchSize + } + writer := kafka.NewWriter(kafkaConfig) return &KafkaWriter{ Writer: writer, - config: config, + config: kafkaConfig, } } diff --git a/mq/kafka_test.go b/mq/kafka_test.go new file mode 100644 index 0000000..3d10d65 --- /dev/null +++ b/mq/kafka_test.go @@ -0,0 +1,12 @@ +package mq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewKafkaWriter(t *testing.T) { + w := NewKafkaWriter([]string{"prod.kafka1.ibanyu.com:9092", "prod.kafka2.ibanyu.com:9092", "prod.kafka3.ibanyu.com:9092"}, "palfish.test.test") + assert.Equal(t, 100, w.config.BatchSize) +} diff --git a/mq/writer_test.go b/mq/writer_test.go new file mode 100644 index 0000000..91de32e --- /dev/null +++ b/mq/writer_test.go @@ -0,0 +1,16 @@ +package mq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewWriter(t *testing.T) { + wi, err := NewWriter(context.TODO(), "palfish.test.test") + assert.Nil(t, err) + w, ok := wi.(*KafkaWriter) + assert.Equal(t, true, ok) + assert.Equal(t, 100, w.config.BatchSize) +}