-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink][cdc] Add support for retry cnt instead of busy wait and additional support to skip corrupt records in cdc writer #4295
[flink][cdc] Add support for retry cnt instead of busy wait and additional support to skip corrupt records in cdc writer #4295
Conversation
…ional support to skip corrupt records in cdc writer
@@ -48,15 +48,33 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> { | |||
.durationType() | |||
.defaultValue(Duration.ofMillis(500)); | |||
|
|||
public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES = | |||
ConfigOptions.key("cdc.max-retry-num-times") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we prefix the new options with retry-
similar to the already existing one cdc.retry-sleep-time
?
e.g. cdc.retry-count
and cdc.retry-skip-corrupt-record
?
public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD = | ||
ConfigOptions.key("cdc.skip-corrupt-record") | ||
.booleanType() | ||
.defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether this is true or false, it will change the current default behaviour of waiting indefinitely for a schema change. so we could either add an option to preserve the current behaviour as default and make retries optional. or make sure patch notes contain an info about the breaking change that paimon will no longer wait indefinitely for schema updates but rather retry and optionally skip unreadable rows.
@@ -75,7 +93,7 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception { | |||
CdcRecord record = element.getValue(); | |||
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields()); | |||
if (!optionalConverted.isPresent()) { | |||
while (true) { | |||
for (int retry = 0; retry < maxRetryNumTimes; ++retry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to me it's slightly unusual to increment the retry counter before entering the block. but i guess it does the same thing as int retry = 0; retry <= maxRetryNumTimes; retry++
and is just preference
throw new IOException(e); | ||
if (!optionalConverted.isPresent()) { | ||
if (skipCorruptRecord) { | ||
LOG.warn("Skipping corrupt or unparsable record {}", record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a note that this might leak sensitive data to the log system. maybe we can log some metadata about the record instead of the full record?
if (skipCorruptRecord) { | ||
LOG.warn("Skipping corrupt or unparsable record {}", record); | ||
} else { | ||
throw new RuntimeException("Unable to process element. Possibly a corrupt record"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we include some info about the record in the exception too?
Maybe we can add a few unit tests and some docs for the new options? |
@JingsongLi could you please help review this? We are currently running this patch internally for Yelp as we cannot afford to block all ingestion based on one "corrupt" message. Corrupt messages entering our stream is rare but inevitable (e.g. older schemas that are not backwards compatible) and it is better for us to log and skip them (or even fail the app) rather than block. |
@JingsongLi can you assign someone to look at the PR? I need your thoughts and opinions on this PR before I resolve other comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks for contribution! I have rename some options, please check: #4756 |
…ional support to skip corrupt records in cdc writer (apache#4295) (cherry picked from commit 5828a7d)
Purpose
This change adds support for retry count and skipping corrupt records instead of assuming a schema change has happened and waiting indefinitely on schema change. This change will cause the Flink cdc job to fail loudly due to a corrupt message (alternatively users have an option to use config option to log and skip such messages) instead of waiting indefinitely on schema change which might not have happened. For more details check the linked issue.
Linked issue: close #4239
Tests
API and Format
Documentation