Skip to content

Commit

Permalink
[cdc] Minor improve CdcUnawareBucketSink and fix unstable test (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jan 10, 2024
1 parent 7c20685 commit f3b3735
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> parsed) {

private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
return new CdcUnawareBucketWriteSink((AppendOnlyFileStoreTable) dataTable, parallelism)
return new CdcUnawareBucketSink((AppendOnlyFileStoreTable) dataTable, parallelism)
.sinkFrom(parsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,22 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.UnawareBucketSink;
import org.apache.paimon.table.AppendOnlyFileStoreTable;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

import java.util.Map;

/** CDC Sink for unaware bucket table. */
public class CdcUnawareBucketWriteSink extends UnawareBucketSink {
public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {

public CdcUnawareBucketWriteSink(AppendOnlyFileStoreTable table, Integer parallelism) {
public CdcUnawareBucketSink(AppendOnlyFileStoreTable table, Integer parallelism) {
super(table, null, null, parallelism, false);
}

public CdcUnawareBucketWriteSink(
AppendOnlyFileStoreTable table,
Integer parallelism,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
boolean boundedInput) {
super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput);
}

@Override
protected OneInputStreamOperator createWriteOperator(
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,28 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} with bucket. */
public class CdcUnawareBucketWriteOperator extends TableWriteOperator<CdcRecord> {

private static final long serialVersionUID = 1L;

public static final ConfigOption<Duration> RETRY_SLEEP_TIME =
ConfigOptions.key("cdc.retry-sleep-time")
.durationType()
.defaultValue(Duration.ofMillis(500));

protected final long retrySleepMillis;
/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
table = table.copyWithLatestSchema();
super.initializeState(context);
}

@Override
protected boolean containLogSystem() {
return false;
}

@Override
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
break;
}
Thread.sleep(retrySleepMillis);
}
write.replace(table);
}

// Only insert type messages are processed
try {
GenericRow cdcDataRow = optionalConverted.get();
if (cdcDataRow.getRowKind() == RowKind.INSERT) {
write.write(cdcDataRow);
}
} catch (Exception e) {
throw new IOException(e);
// only accepts INSERT record
if (element.getValue().kind() == RowKind.INSERT) {
super.processElement(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> par
}

private void buildForUnawareBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
new CdcUnawareBucketWriteSink((AppendOnlyFileStoreTable) table, parallelism)
.sinkFrom(parsed);
new CdcUnawareBucketSink((AppendOnlyFileStoreTable) table, parallelism).sinkFrom(parsed);
}

private void buildDividedCdcSink() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
Expand All @@ -55,7 +51,6 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
Expand Down Expand Up @@ -86,7 +81,7 @@ public void testRandomCdcEventsUnawareBucket() throws Exception {
innerTestRandomCdcEvents(() -> -1, true);
}

private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean isAppendTable)
private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareBucketMode)
throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();

Expand All @@ -111,7 +106,7 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean isAppend
random.nextInt(maxSchemaChanges) + 1,
random.nextInt(maxPartitions) + 1,
random.nextInt(maxKeys) + 1,
isAppendTable);
unawareBucketMode);
testTables.add(testTable);

Path tablePath;
Expand Down Expand Up @@ -143,9 +138,8 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean isAppend
fileIO,
testTable.initialRowType(),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
bucket.get(),
isAppendTable);
unawareBucketMode ? Collections.emptyList() : Arrays.asList("pt", "k"),
bucket.get());
fileStoreTables.add(fileStoreTable);
}

Expand Down Expand Up @@ -179,32 +173,17 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean isAppend

// enable failure when running jobs if needed
FailingFileIO.reset(failingName, 10, 10000);

if (!isAppendTable) {
env.execute();
} else {
// When the unaware bucket table is synchronized, it creates a topology and compacts, so
// the task does not terminate.In the test task, we are given enough time to execute the
// job.
if (unawareBucketMode) {
// there's a compact operator which won't terminate
env.executeAsync();
long waitResultTime = TimeUnit.SECONDS.toMillis(60);
Thread.sleep(waitResultTime);
} else {
env.execute();
}

// no failure when checking results
FailingFileIO.reset(failingName, 0, 1);

for (int i = 0; i < numTables; i++) {
FileStoreTable table = fileStoreTables.get(i).copyWithLatestSchema();
SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
TableSchema schema = schemaManager.latest().get();

ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
try (RecordReaderIterator<InternalRow> it =
new RecordReaderIterator<>(readBuilder.newRead().createReader(plan))) {
testTables.get(i).assertResult(schema, it);
}
for (int i = 0; i < fileStoreTables.size(); i++) {
testTables.get(i).assertResult(fileStoreTables.get(i));
}
}

Expand All @@ -214,8 +193,7 @@ private FileStoreTable createFileStoreTable(
RowType rowType,
List<String> partitions,
List<String> primaryKeys,
int numBucket,
boolean isAppendTable)
int numBucket)
throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, numBucket);
Expand All @@ -226,12 +204,7 @@ private FileStoreTable createFileStoreTable(
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(fileIO, tablePath),
new Schema(
rowType.getFields(),
partitions,
isAppendTable ? Collections.emptyList() : primaryKeys,
conf.toMap(),
""));
new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), ""));
return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
}

Expand Down
Loading

0 comments on commit f3b3735

Please sign in to comment.