Skip to content

Commit

Permalink
Merge pull request #171 from shawnfeng/feature-mq-config
Browse files Browse the repository at this point in the history
add kafka writer batchsize config of topic
  • Loading branch information
niubell authored Jun 9, 2020
2 parents 80f00d9 + 4c84447 commit be5781e
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 11 deletions.
32 changes: 25 additions & 7 deletions mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Config struct {
TTR uint32 // time to run
TTL uint32 // time to live
Tries uint16 // delay tries
BatchSize int
}

type KeyParts struct {
Expand Down Expand Up @@ -183,13 +184,14 @@ func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent {
}

const (
apolloConfigSep = "."
apolloBrokersSep = ","
apolloBrokersKey = "brokers"
apolloOffsetAtKey = "offsetat"
apolloTTRKey = "ttr"
apolloTTLKey = "ttl"
apolloTriesKey = "tries"
apolloConfigSep = "."
apolloBrokersSep = ","
apolloBrokersKey = "brokers"
apolloOffsetAtKey = "offsetat"
apolloTTRKey = "ttr"
apolloTTLKey = "ttl"
apolloTriesKey = "tries"
apolloBatchSizeKey = "batchsize"
)

type ApolloConfig struct {
Expand Down Expand Up @@ -298,6 +300,21 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp
}
slog.Infof(ctx, "%s got config triesVal:%s", fun, triesVal)

batchSize := defaultBatchSize
batchSizeVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloBatchSizeKey, mqType)
if !ok {
// do nothing
slog.Infof(ctx, "%s has no batchsize config", fun)
} else {
t, err := strconv.Atoi(batchSizeVal)
if err != nil {
slog.Errorf(ctx, "%s got invalid batchsize config, batchsize: %s", fun, batchSizeVal)
} else {
batchSize = t
}
}
slog.Infof(ctx, "%s got config batchSize: %d", fun, batchSize)

return &Config{
MQType: mqType,
MQAddr: brokers,
Expand All @@ -309,6 +326,7 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp
TTR: uint32(ttr),
TTL: uint32(ttl),
Tries: uint16(tries),
BatchSize: batchSize,
}, nil
}

Expand Down
File renamed without changes.
17 changes: 13 additions & 4 deletions mq/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"time"
)

const (
defaultBatchSize = 1
)

type KafkaHandler struct {
msg kafka.Message
reader *kafka.Reader
Expand Down Expand Up @@ -130,19 +134,24 @@ type KafkaWriter struct {
}

func NewKafkaWriter(brokers []string, topic string) *KafkaWriter {
config := kafka.WriterConfig{
kafkaConfig := kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
Balancer: &kafka.Hash{},
BatchSize: 1,
BatchSize: defaultBatchSize,
//RequiredAcks: 1,
//Async: true,
}
writer := kafka.NewWriter(config)
// TODO should optimize this, too dumb, double get, reset batchsize
config, _ := DefaultConfiger.GetConfig(context.TODO(), topic, MQTypeKafka)
if config != nil && config.BatchSize > defaultBatchSize {
kafkaConfig.BatchSize = config.BatchSize
}
writer := kafka.NewWriter(kafkaConfig)

return &KafkaWriter{
Writer: writer,
config: config,
config: kafkaConfig,
}
}

Expand Down
12 changes: 12 additions & 0 deletions mq/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package mq

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewKafkaWriter(t *testing.T) {
w := NewKafkaWriter([]string{"prod.kafka1.ibanyu.com:9092", "prod.kafka2.ibanyu.com:9092", "prod.kafka3.ibanyu.com:9092"}, "palfish.test.test")
assert.Equal(t, 100, w.config.BatchSize)
}
16 changes: 16 additions & 0 deletions mq/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mq

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewWriter(t *testing.T) {
wi, err := NewWriter(context.TODO(), "palfish.test.test")
assert.Nil(t, err)
w, ok := wi.(*KafkaWriter)
assert.Equal(t, true, ok)
assert.Equal(t, 100, w.config.BatchSize)
}

0 comments on commit be5781e

Please sign in to comment.