Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.kafka_consumer): Use per-message parser to avoid races #13886

Merged
merged 1 commit into from
Sep 11, 2023

Conversation

toanju
Copy link
Contributor

@toanju toanju commented Sep 7, 2023

Using parsers like json_v2 will result in undesired parser results. This switches to the ParserFunc pattern to create a dedicated parser in each thread.

Required for all PRs

fixes #13888

@telegraf-tiger telegraf-tiger bot added fix pr to fix corresponding bug plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Sep 7, 2023
@powersj
Copy link
Contributor

powersj commented Sep 7, 2023

Hi,

Using parsers like json_v2 will result in undesired parser results.

Can you please file an issue and document what you were seeing please?

@powersj powersj added the waiting for response waiting for response from contributor label Sep 7, 2023
@toanju
Copy link
Contributor Author

toanju commented Sep 8, 2023

related to #13888

let me know if you need more info

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Sep 8, 2023
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that might work it will strictly serialize the parsing... Should we instead change the plugin to the parser-func interface and create a new parser each time (or use a sync.Pool of parsers)?

My concern is that this will hamper performance on larger messages or slow parsers...

@srebhan srebhan self-assigned this Sep 8, 2023
@toanju
Copy link
Contributor Author

toanju commented Sep 9, 2023

Performance is not important if you get wrong results. Though, I agree that the mutex in this place may have an impact on other parsers and the JSON parser might be one that folks to not use a lot.

I am open to implement this, however, currently I'd need to look into some of the suggested options to get a better understanding. Hence, if someone can point into the right direction this would be awesome.

@toanju toanju force-pushed the kafka-call-parsers-synchronous branch from f88cffa to 58eb3cf Compare September 9, 2023 19:51
@toanju
Copy link
Contributor Author

toanju commented Sep 9, 2023

@srebhan, implemented the ParserFunc for now. Looking forward to additional feedback.

Using parsers like json_v2 will result in undesired parser results. This
switches to the ParserFunc pattern to create a dedicated parser in each
thread.
@toanju toanju force-pushed the kafka-call-parsers-synchronous branch from 58eb3cf to cf7f511 Compare September 10, 2023 08:09
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the quick update and your contribution in general @toanju!

@srebhan srebhan changed the title fix(inputs.kafka_consumer): use mutex for parser fix(inputs.kafka_consumer): Use per-message parser to avoid races Sep 11, 2023
@srebhan srebhan added the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Sep 11, 2023
@srebhan srebhan assigned powersj and unassigned srebhan Sep 11, 2023
@powersj powersj merged commit 3fae643 into influxdata:master Sep 11, 2023
5 checks passed
@github-actions github-actions bot added this to the v1.28.0 milestone Sep 11, 2023
@toanju toanju deleted the kafka-call-parsers-synchronous branch September 11, 2023 15:26
@toanju
Copy link
Contributor Author

toanju commented Sep 11, 2023

Many thanks @srebhan and @powersj 🎉

@athornton
Copy link
Contributor

This makes using a schema registry impossible, because the schema registry cache is down in the Avro parser, and a new parser means an always-empty cache, which means we query the schema registry once per measurement. Since in normal operations there are many, many more measurements than schemas, this creates an enormous load on the schema registry, and it also means that parsing each measurement takes much too long, since it has to wait on a network call.

#14025 is what I'm working on to address this. @srebhan suggested an approach based on adding Clone() to the parser layer, and I'm trying to work through that; this looks to me like I have to understand each parser, whether or not it's thread-safe, and implement the correct Clone() method for each parser. That's a daunting task. Another possibility might be to associate the schema registry with the Kafka input layer rather than the Avro parser layer, since you need to be using both Kafka and Avro (I think) to use a Schema registry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka fix pr to fix corresponding bug plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

inputs.kafka_consumer: parser fails to process data
4 participants