Skip to content

Commit

Permalink
[flink][cdc] Add support for retry cnt instead of busy wait and addit…
Browse files Browse the repository at this point in the history
…ional support to skip corrupt records in cdc writer
  • Loading branch information
AshishKhatkar committed Oct 9, 2024
1 parent ba98f31 commit 5c70e64
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand All @@ -43,13 +45,19 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

public CdcDynamicBucketWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -68,7 +76,7 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
Tuple2<CdcRecord, Integer> record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -79,10 +87,18 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
write.replace(table);
}

try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand Down Expand Up @@ -118,6 +120,9 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce

FileStoreTable table = getTable(tableId);

int retryCnt = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
boolean skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);

// all table write should share one write buffer so that writers can preempt memory
// from those of other tables
if (memoryPoolFactory == null) {
Expand Down Expand Up @@ -149,7 +154,7 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
toGenericRow(record.record(), table.schema().fields());
if (!optionalConverted.isPresent()) {
FileStoreTable latestTable = table;
while (true) {
for (int retry = 0; retry < retryCnt; ++retry) {
latestTable = latestTable.copyWithLatestSchema();
tables.put(tableId, latestTable);
optionalConverted = toGenericRow(record.record(), latestTable.schema().fields());
Expand All @@ -166,10 +171,18 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
write.replace(latestTable);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,33 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
.durationType()
.defaultValue(Duration.ofMillis(500));

public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES =
ConfigOptions.key("cdc.max-retry-num-times")
.intType()
.defaultValue(100)
.withDescription("Max retry count for updating table before failing loudly");

public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD =
ConfigOptions.key("cdc.skip-corrupt-record")
.booleanType()
.defaultValue(false)
.withDescription("Skip corrupt record if we fail to parse it");

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

public CdcRecordStoreWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -75,7 +93,7 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -86,10 +104,18 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
write.replace(table);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}
}

0 comments on commit 5c70e64

Please sign in to comment.