Skip to content

Commit

Permalink
fix(inputs.kafka_consumer): Use per-message parser to avoid races (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
toanju authored Sep 11, 2023
1 parent 855b25d commit 3fae643
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 39 deletions.
35 changes: 20 additions & 15 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ type KafkaConsumer struct {
ticker *time.Ticker
fingerprint string

parser telegraf.Parser
topicLock sync.Mutex
wg sync.WaitGroup
cancel context.CancelFunc
parserFunc telegraf.ParserFunc
topicLock sync.Mutex
wg sync.WaitGroup
cancel context.CancelFunc
}

type ConsumerGroup interface {
Expand All @@ -89,8 +89,8 @@ func (*KafkaConsumer) SampleConfig() string {
return sampleConfig
}

func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
func (k *KafkaConsumer) SetParserFunc(fn telegraf.ParserFunc) {
k.parserFunc = fn
}

func (k *KafkaConsumer) Init() error {
Expand Down Expand Up @@ -308,7 +308,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
k.startErrorAdder(acc)

for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
// We need to copy allWantedTopics; the Consume() is
Expand Down Expand Up @@ -358,12 +358,12 @@ type Message struct {
session sarama.ConsumerGroupSession
}

func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler {
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn telegraf.ParserFunc, log telegraf.Logger) *ConsumerGroupHandler {
handler := &ConsumerGroupHandler{
acc: acc.WithTracking(maxUndelivered),
sem: make(chan empty, maxUndelivered),
undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered),
parser: parser,
parserFunc: fn,
log: log,
}
return handler
Expand All @@ -374,11 +374,11 @@ type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string

acc telegraf.TrackingAccumulator
sem semaphore
parser telegraf.Parser
wg sync.WaitGroup
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem semaphore
parserFunc telegraf.ParserFunc
wg sync.WaitGroup
cancel context.CancelFunc

mu sync.Mutex
undelivered map[telegraf.TrackingID]Message
Expand Down Expand Up @@ -456,7 +456,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
len(msg.Value), h.MaxMessageLen)
}

metrics, err := h.parser.Parse(msg.Value)
parser, err := h.parserFunc()
if err != nil {
return fmt.Errorf("creating parser: %w", err)
}

metrics, err := parser.Parse(msg.Value)
if err != nil {
h.release()
return err
Expand Down
67 changes: 43 additions & 24 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,16 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {

func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",

parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -326,12 +330,15 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {

func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",
parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -444,12 +451,15 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",
parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})
cg.MaxMessageLen = tt.maxMessageLen
cg.TopicTag = tt.topicTag

Expand Down Expand Up @@ -563,9 +573,12 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
MaxUndeliveredMessages: 1,
ConnectionStrategy: tt.connectionStrategy,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)
require.NoError(t, input.Init())

acc := testutil.Accumulator{}
Expand Down Expand Up @@ -621,9 +634,12 @@ func TestExponentialBackoff(t *testing.T) {
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)

//time how long initialization (connection) takes
start := time.Now()
Expand Down Expand Up @@ -666,9 +682,12 @@ func TestExponentialBackoffDefault(t *testing.T) {
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)

require.NoError(t, input.Init())

Expand Down

0 comments on commit 3fae643

Please sign in to comment.