From 3fae6439abacdc71ef906e95fce675ada4b09abd Mon Sep 17 00:00:00 2001 From: Tobias Jungel Date: Mon, 11 Sep 2023 16:18:51 +0200 Subject: [PATCH] fix(inputs.kafka_consumer): Use per-message parser to avoid races (#13886) --- .../inputs/kafka_consumer/kafka_consumer.go | 35 +++++----- .../kafka_consumer/kafka_consumer_test.go | 67 ++++++++++++------- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 2d1d5901396df..695c7a6017c8a 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -63,10 +63,10 @@ type KafkaConsumer struct { ticker *time.Ticker fingerprint string - parser telegraf.Parser - topicLock sync.Mutex - wg sync.WaitGroup - cancel context.CancelFunc + parserFunc telegraf.ParserFunc + topicLock sync.Mutex + wg sync.WaitGroup + cancel context.CancelFunc } type ConsumerGroup interface { @@ -89,8 +89,8 @@ func (*KafkaConsumer) SampleConfig() string { return sampleConfig } -func (k *KafkaConsumer) SetParser(parser telegraf.Parser) { - k.parser = parser +func (k *KafkaConsumer) SetParserFunc(fn telegraf.ParserFunc) { + k.parserFunc = fn } func (k *KafkaConsumer) Init() error { @@ -308,7 +308,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { k.startErrorAdder(acc) for ctx.Err() == nil { - handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag // We need to copy allWantedTopics; the Consume() is @@ -358,12 +358,12 @@ type Message struct { session sarama.ConsumerGroupSession } -func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler { +func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn telegraf.ParserFunc, log telegraf.Logger) *ConsumerGroupHandler { handler := &ConsumerGroupHandler{ acc: acc.WithTracking(maxUndelivered), sem: make(chan empty, maxUndelivered), undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered), - parser: parser, + parserFunc: fn, log: log, } return handler @@ -374,11 +374,11 @@ type ConsumerGroupHandler struct { MaxMessageLen int TopicTag string - acc telegraf.TrackingAccumulator - sem semaphore - parser telegraf.Parser - wg sync.WaitGroup - cancel context.CancelFunc + acc telegraf.TrackingAccumulator + sem semaphore + parserFunc telegraf.ParserFunc + wg sync.WaitGroup + cancel context.CancelFunc mu sync.Mutex undelivered map[telegraf.TrackingID]Message @@ -456,7 +456,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * len(msg.Value), h.MaxMessageLen) } - metrics, err := h.parser.Parse(msg.Value) + parser, err := h.parserFunc() + if err != nil { + return fmt.Errorf("creating parser: %w", err) + } + + metrics, err := parser.Parse(msg.Value) if err != nil { h.release() return err diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index f7b82c76df459..36077a51620ab 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -293,12 +293,16 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { func TestConsumerGroupHandler_Lifecycle(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.Parser{ - MetricName: "cpu", - DataType: "int", + + parserFunc := func() (telegraf.Parser, error) { + parser := &value.Parser{ + MetricName: "cpu", + DataType: "int", + } + err := parser.Init() + return parser, err } - require.NoError(t, parser.Init()) - cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) + cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -326,12 +330,15 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) { func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.Parser{ - MetricName: "cpu", - DataType: "int", + parserFunc := func() (telegraf.Parser, error) { + parser := &value.Parser{ + MetricName: "cpu", + DataType: "int", + } + err := parser.Init() + return parser, err } - require.NoError(t, parser.Init()) - cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) + cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -444,12 +451,15 @@ func TestConsumerGroupHandler_Handle(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.Parser{ - MetricName: "cpu", - DataType: "int", + parserFunc := func() (telegraf.Parser, error) { + parser := &value.Parser{ + MetricName: "cpu", + DataType: "int", + } + err := parser.Init() + return parser, err } - require.NoError(t, parser.Init()) - cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) + cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) cg.MaxMessageLen = tt.maxMessageLen cg.TopicTag = tt.topicTag @@ -563,9 +573,12 @@ func TestKafkaRoundTripIntegration(t *testing.T) { MaxUndeliveredMessages: 1, ConnectionStrategy: tt.connectionStrategy, } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - input.SetParser(parser) + parserFunc := func() (telegraf.Parser, error) { + parser := &influx.Parser{} + err := parser.Init() + return parser, err + } + input.SetParserFunc(parserFunc) require.NoError(t, input.Init()) acc := testutil.Accumulator{} @@ -621,9 +634,12 @@ func TestExponentialBackoff(t *testing.T) { }, }, } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - input.SetParser(parser) + parserFunc := func() (telegraf.Parser, error) { + parser := &influx.Parser{} + err := parser.Init() + return parser, err + } + input.SetParserFunc(parserFunc) //time how long initialization (connection) takes start := time.Now() @@ -666,9 +682,12 @@ func TestExponentialBackoffDefault(t *testing.T) { }, }, } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - input.SetParser(parser) + parserFunc := func() (telegraf.Parser, error) { + parser := &influx.Parser{} + err := parser.Init() + return parser, err + } + input.SetParserFunc(parserFunc) require.NoError(t, input.Init())