Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.kafka_consumer): Use per-message parser to avoid races #13886

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ type KafkaConsumer struct {
ticker *time.Ticker
fingerprint string

parser telegraf.Parser
topicLock sync.Mutex
wg sync.WaitGroup
cancel context.CancelFunc
parserFunc telegraf.ParserFunc
topicLock sync.Mutex
wg sync.WaitGroup
cancel context.CancelFunc
}

type ConsumerGroup interface {
Expand All @@ -89,8 +89,8 @@ func (*KafkaConsumer) SampleConfig() string {
return sampleConfig
}

func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
func (k *KafkaConsumer) SetParserFunc(fn telegraf.ParserFunc) {
k.parserFunc = fn
}

func (k *KafkaConsumer) Init() error {
Expand Down Expand Up @@ -308,7 +308,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
k.startErrorAdder(acc)

for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
// We need to copy allWantedTopics; the Consume() is
Expand Down Expand Up @@ -358,12 +358,12 @@ type Message struct {
session sarama.ConsumerGroupSession
}

func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler {
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn telegraf.ParserFunc, log telegraf.Logger) *ConsumerGroupHandler {
handler := &ConsumerGroupHandler{
acc: acc.WithTracking(maxUndelivered),
sem: make(chan empty, maxUndelivered),
undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered),
parser: parser,
parserFunc: fn,
log: log,
}
return handler
Expand All @@ -374,11 +374,11 @@ type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string

acc telegraf.TrackingAccumulator
sem semaphore
parser telegraf.Parser
wg sync.WaitGroup
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem semaphore
parserFunc telegraf.ParserFunc
wg sync.WaitGroup
cancel context.CancelFunc

mu sync.Mutex
undelivered map[telegraf.TrackingID]Message
Expand Down Expand Up @@ -456,7 +456,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
len(msg.Value), h.MaxMessageLen)
}

metrics, err := h.parser.Parse(msg.Value)
parser, err := h.parserFunc()
if err != nil {
return fmt.Errorf("creating parser: %w", err)
}

metrics, err := parser.Parse(msg.Value)
if err != nil {
h.release()
return err
Expand Down
67 changes: 43 additions & 24 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,16 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {

func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",

parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -326,12 +330,15 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {

func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",
parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -444,12 +451,15 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
DataType: "int",
parserFunc := func() (telegraf.Parser, error) {
parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})
cg.MaxMessageLen = tt.maxMessageLen
cg.TopicTag = tt.topicTag

Expand Down Expand Up @@ -563,9 +573,12 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
MaxUndeliveredMessages: 1,
ConnectionStrategy: tt.connectionStrategy,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)
require.NoError(t, input.Init())

acc := testutil.Accumulator{}
Expand Down Expand Up @@ -621,9 +634,12 @@ func TestExponentialBackoff(t *testing.T) {
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)

//time how long initialization (connection) takes
start := time.Now()
Expand Down Expand Up @@ -666,9 +682,12 @@ func TestExponentialBackoffDefault(t *testing.T) {
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
parserFunc := func() (telegraf.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)

require.NoError(t, input.Init())

Expand Down
Loading