Skip to content

Commit

Permalink
[flink][cdc] Add support for retry cnt instead of busy wait and addit…
Browse files Browse the repository at this point in the history
…ional support to skip corrupt records in cdc writer (apache#4295)
  • Loading branch information
AshishKhatkar authored Dec 23, 2024
1 parent 8ab24a1 commit 5828a7d
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand All @@ -47,6 +49,10 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -55,6 +61,8 @@ private CdcDynamicBucketWriteOperator(
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -73,7 +81,7 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
Tuple2<CdcRecord, Integer> record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -84,10 +92,18 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
write.replace(table);
}

try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand Down Expand Up @@ -123,6 +125,9 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce

FileStoreTable table = getTable(tableId);

int retryCnt = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
boolean skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);

// all table write should share one write buffer so that writers can preempt memory
// from those of other tables
if (memoryPoolFactory == null) {
Expand Down Expand Up @@ -154,7 +159,7 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
toGenericRow(record.record(), table.schema().fields());
if (!optionalConverted.isPresent()) {
FileStoreTable latestTable = table;
while (true) {
for (int retry = 0; retry < retryCnt; ++retry) {
latestTable = latestTable.copyWithLatestSchema();
tables.put(tableId, latestTable);
optionalConverted = toGenericRow(record.record(), latestTable.schema().fields());
Expand All @@ -171,10 +176,18 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
write.replace(latestTable);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException("Exception occurs for writing record to table: " + tableId, e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,24 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
.durationType()
.defaultValue(Duration.ofMillis(500));

public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES =
ConfigOptions.key("cdc.max-retry-num-times")
.intType()
.defaultValue(100)
.withDescription("Max retry count for updating table before failing loudly");

public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD =
ConfigOptions.key("cdc.skip-corrupt-record")
.booleanType()
.defaultValue(false)
.withDescription("Skip corrupt record if we fail to parse it");

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -62,6 +78,8 @@ protected CdcRecordStoreWriteOperator(
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -80,7 +98,7 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -91,10 +109,18 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
write.replace(table);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down

0 comments on commit 5828a7d

Please sign in to comment.