From 5828a7def5a8270b04a9efa971814c6b0457d76e Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Mon, 23 Dec 2024 08:44:47 +0000 Subject: [PATCH] [flink][cdc] Add support for retry cnt instead of busy wait and additional support to skip corrupt records in cdc writer (#4295) --- .../cdc/CdcDynamicBucketWriteOperator.java | 26 +++++++++++--- .../cdc/CdcRecordStoreMultiWriteOperator.java | 23 +++++++++--- .../sink/cdc/CdcRecordStoreWriteOperator.java | 36 ++++++++++++++++--- 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java index b0b135b3610b..5637e8b12794 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java @@ -35,7 +35,9 @@ import java.io.IOException; import java.util.Optional; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD; import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow; /** @@ -47,6 +49,10 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator parameters, FileStoreTable table, @@ -55,6 +61,8 @@ private CdcDynamicBucketWriteOperator( super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.retrySleepMillis = table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); + this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); + this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); } @Override @@ -73,7 +81,7 @@ public void processElement(StreamRecord> element) thr Tuple2 record = element.getValue(); Optional optionalConverted = toGenericRow(record.f0, table.schema().fields()); if (!optionalConverted.isPresent()) { - while (true) { + for (int retry = 0; retry < maxRetryNumTimes; ++retry) { table = table.copyWithLatestSchema(); optionalConverted = toGenericRow(record.f0, table.schema().fields()); if (optionalConverted.isPresent()) { @@ -84,10 +92,18 @@ public void processElement(StreamRecord> element) thr write.replace(table); } - try { - write.write(optionalConverted.get(), record.f1); - } catch (Exception e) { - throw new IOException(e); + if (!optionalConverted.isPresent()) { + if (skipCorruptRecord) { + LOG.warn("Skipping corrupt or unparsable record {}", record); + } else { + throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + } + } else { + try { + write.write(optionalConverted.get(), record.f1); + } catch (Exception e) { + throw new IOException(e); + } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index a4b4e8284043..a6c55c55f18c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -54,7 +54,9 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD; import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow; /** @@ -123,6 +125,9 @@ public void processElement(StreamRecord element) throws Exce FileStoreTable table = getTable(tableId); + int retryCnt = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); + boolean skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); + // all table write should share one write buffer so that writers can preempt memory // from those of other tables if (memoryPoolFactory == null) { @@ -154,7 +159,7 @@ public void processElement(StreamRecord element) throws Exce toGenericRow(record.record(), table.schema().fields()); if (!optionalConverted.isPresent()) { FileStoreTable latestTable = table; - while (true) { + for (int retry = 0; retry < retryCnt; ++retry) { latestTable = latestTable.copyWithLatestSchema(); tables.put(tableId, latestTable); optionalConverted = toGenericRow(record.record(), latestTable.schema().fields()); @@ -171,10 +176,18 @@ public void processElement(StreamRecord element) throws Exce write.replace(latestTable); } - try { - write.write(optionalConverted.get()); - } catch (Exception e) { - throw new IOException("Exception occurs for writing record to table: " + tableId, e); + if (!optionalConverted.isPresent()) { + if (skipCorruptRecord) { + LOG.warn("Skipping corrupt or unparsable record {}", record); + } else { + throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + } + } else { + try { + write.write(optionalConverted.get()); + } catch (Exception e) { + throw new IOException(e); + } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java index 195e683daaf6..8a8233842df7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java @@ -52,8 +52,24 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator { .durationType() .defaultValue(Duration.ofMillis(500)); + public static final ConfigOption MAX_RETRY_NUM_TIMES = + ConfigOptions.key("cdc.max-retry-num-times") + .intType() + .defaultValue(100) + .withDescription("Max retry count for updating table before failing loudly"); + + public static final ConfigOption SKIP_CORRUPT_RECORD = + ConfigOptions.key("cdc.skip-corrupt-record") + .booleanType() + .defaultValue(false) + .withDescription("Skip corrupt record if we fail to parse it"); + private final long retrySleepMillis; + private final int maxRetryNumTimes; + + private final boolean skipCorruptRecord; + protected CdcRecordStoreWriteOperator( StreamOperatorParameters parameters, FileStoreTable table, @@ -62,6 +78,8 @@ protected CdcRecordStoreWriteOperator( super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.retrySleepMillis = table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); + this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); + this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); } @Override @@ -80,7 +98,7 @@ public void processElement(StreamRecord element) throws Exception { CdcRecord record = element.getValue(); Optional optionalConverted = toGenericRow(record, table.schema().fields()); if (!optionalConverted.isPresent()) { - while (true) { + for (int retry = 0; retry < maxRetryNumTimes; ++retry) { table = table.copyWithLatestSchema(); optionalConverted = toGenericRow(record, table.schema().fields()); if (optionalConverted.isPresent()) { @@ -91,10 +109,18 @@ public void processElement(StreamRecord element) throws Exception { write.replace(table); } - try { - write.write(optionalConverted.get()); - } catch (Exception e) { - throw new IOException(e); + if (!optionalConverted.isPresent()) { + if (skipCorruptRecord) { + LOG.warn("Skipping corrupt or unparsable record {}", record); + } else { + throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + } + } else { + try { + write.write(optionalConverted.get()); + } catch (Exception e) { + throw new IOException(e); + } } }