Skip to content

Commit

Permalink
Merge pull request #174 from shawnfeng/feature-kafka-config
Browse files Browse the repository at this point in the history
Feature kafka config
  • Loading branch information
niubell authored Jun 12, 2020
2 parents 7697a95 + bb05ab5 commit 0e5e02d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
49 changes: 32 additions & 17 deletions mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ func (c ConfigerType) String() string {

const (
defaultTimeout = 3 * time.Second
defaultTTR = 3600 // 1 hour
defaultTTL = 3600 * 24 // 1 day
defaultTries = 1
//默认1000毫秒
defaultBatchTimeoutMs = 1000
defaultTTR = 3600 // 1 hour
defaultTTL = 3600 * 24 // 1 day
defaultTries = 1
)

type Config struct {
Expand All @@ -69,12 +71,14 @@ type Config struct {
Topic string
TimeOut time.Duration
CommitInterval time.Duration
Offset int64
OffsetAt string
TTR uint32 // time to run
TTL uint32 // time to live
Tries uint16 // delay tries
BatchSize int
// time interval to flush msg to broker default is 1 second
BatchTimeout time.Duration
Offset int64
OffsetAt string
TTR uint32 // time to run
TTL uint32 // time to live
Tries uint16 // delay tries
BatchSize int
}

type KeyParts struct {
Expand Down Expand Up @@ -184,14 +188,15 @@ func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent {
}

const (
apolloConfigSep = "."
apolloBrokersSep = ","
apolloBrokersKey = "brokers"
apolloOffsetAtKey = "offsetat"
apolloTTRKey = "ttr"
apolloTTLKey = "ttl"
apolloTriesKey = "tries"
apolloBatchSizeKey = "batchsize"
apolloConfigSep = "."
apolloBrokersSep = ","
apolloBrokersKey = "brokers"
apolloOffsetAtKey = "offsetat"
apolloTTRKey = "ttr"
apolloTTLKey = "ttl"
apolloTriesKey = "tries"
apolloBatchSizeKey = "batchsize"
apolloBatchTimeoutMsKey = "batchtimeoutms"
)

type ApolloConfig struct {
Expand Down Expand Up @@ -314,13 +319,23 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp
}
}
slog.Infof(ctx, "%s got config batchSize: %d", fun, batchSize)
batchTimeoutMs, ok := m.getConfigItemWithFallback(ctx, topic, apolloBatchTimeoutMsKey, mqType)
if !ok {
slog.Infof(ctx, "%s no batchTimeout config founds", fun)
}
batchTimeoutMsVal, err := strconv.ParseUint(batchTimeoutMs, 10, 32)
if err != nil {
batchTimeoutMsVal = defaultBatchTimeoutMs
}
slog.Infof(ctx, "%s got config batchTimeout:%d", fun, ttl)

return &Config{
MQType: mqType,
MQAddr: brokers,
Topic: topic,
TimeOut: defaultTimeout,
CommitInterval: 1 * time.Second,
BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond,
Offset: FirstOffset,
OffsetAt: offsetAtVal,
TTR: uint32(ttr),
Expand Down
9 changes: 7 additions & 2 deletions mq/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,13 @@ func NewKafkaWriter(brokers []string, topic string) *KafkaWriter {
}
// TODO should optimize this, too dumb, double get, reset batchsize
config, _ := DefaultConfiger.GetConfig(context.TODO(), topic, MQTypeKafka)
if config != nil && config.BatchSize > defaultBatchSize {
kafkaConfig.BatchSize = config.BatchSize
if config != nil {
if config.BatchSize > defaultBatchSize {
kafkaConfig.BatchSize = config.BatchSize
}
if config.BatchTimeout > 0 {
kafkaConfig.BatchTimeout = config.BatchTimeout
}
}
writer := kafka.NewWriter(kafkaConfig)

Expand Down
2 changes: 2 additions & 0 deletions mq/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package mq

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewKafkaWriter(t *testing.T) {
w := NewKafkaWriter([]string{"prod.kafka1.ibanyu.com:9092", "prod.kafka2.ibanyu.com:9092", "prod.kafka3.ibanyu.com:9092"}, "palfish.test.test")
assert.Equal(t, 100, w.config.BatchSize)
assert.Equal(t, time.Duration(5000000), w.config.BatchTimeout)
}

0 comments on commit 0e5e02d

Please sign in to comment.