Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wgcn committed Mar 31, 2024
1 parent 90a200d commit c4ef352
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void updateTableMap()
for (String tableName : tables) {
Identifier identifier = Identifier.create(databaseName, tableName);
if (shouldCompactTable(identifier, includingPattern, excludingPattern)
&& (!tableScanned(identifier))) {
&& (!checkTableScanned(identifier))) {
Table table = catalog.getTable(identifier);
if (!(table instanceof FileStoreTable)) {
LOG.error(
Expand All @@ -107,7 +107,7 @@ abstract Boolean collectFiles(SourceFunction.SourceContext<T> ctx)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException;

/** Check if table has been scanned. */
abstract boolean tableScanned(Identifier identifier);
abstract boolean checkTableScanned(Identifier identifier);

/** Add the scan table to the table map. */
abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public void scan(SourceFunction.SourceContext<T> ctx) throws Exception {
}
if (isEmpty) {
// Currently, in the combined mode, there are two scan tasks for the table of two
// different bucket type (multi bucket & unaware bucket) are
// running concurrently.
// different bucket type (multi bucket & unaware bucket) running concurrently.
// There will be a situation that there is only one task compaction , therefore this
// should not be thrown exception here.
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public Boolean collectFiles(SourceFunction.SourceContext<Tuple2<Split, String>>
updateTableMap();

try {
// batch mode do not need check for new tables
List<Tuple2<Split, String>> splits = new ArrayList<>();
for (Map.Entry<Identifier, StreamTableScan> entry : scansMap.entrySet()) {
Identifier identifier = entry.getKey();
Expand All @@ -103,24 +102,19 @@ public Boolean collectFiles(SourceFunction.SourceContext<Tuple2<Split, String>>
}

@Override
public boolean tableScanned(Identifier identifier) {
public boolean checkTableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
LOG.info(
String.format("the bucket mode of %s is unware. ", identifier.getFullName())
+ "currently, the table with unware bucket mode is not support in combined mode.");
return;
if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}

BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Boolean collectFiles(SourceFunction.SourceContext<AppendOnlyCompactionTas
}

@Override
public boolean tableScanned(Identifier identifier) {
public boolean checkTableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public DataStream<MultiTableCommittable> doWrite(
.transform(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
createWriteOperator(
CombinedMultiComacptionWriteOperator(
env.getCheckpointConfig(), isStreaming, commitUser))
.setParallelism(multiBucketTableSource.getParallelism());

Expand All @@ -111,7 +111,7 @@ public DataStream<MultiTableCommittable> doWrite(
.transform(
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
new UnawareCombinedCompactionWorkerOperator(
new CombinedUmawareCompactionWorkerOperator(
catalogLoader, commitUser, options))
.setParallelism(unawareBucketTableSource.getParallelism());

Expand Down Expand Up @@ -181,8 +181,9 @@ private void assertBatchConfiguration(StreamExecutionEnvironment env, int sinkPa
}

// TODO:refactor FlinkSink to adopt this sink
protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
protected OneInputStreamOperator<RowData, MultiTableCommittable>
CombinedMultiComacptionWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
return new MultiTablesStoreCompactOperator(
catalogLoader,
commitUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.compact.UnwareBucketCompactionHelper;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.flink.source.operator.CombinedBatchUnawareSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedStreamingUnawareSourceFunction;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
Expand All @@ -48,13 +49,14 @@

/**
* Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
* BucketUnawareCompactSource} for support compacting multi unaware bucket tables in combined mode.
* CombinedStreamingUnawareSourceFunction} or{@link CombinedBatchUnawareSourceFunction} for support
* compacting multi unaware bucket tables in combined mode.
*/
public class UnawareCombinedCompactionWorkerOperator
public class CombinedUmawareCompactionWorkerOperator
extends PrepareCommitOperator<AppendOnlyCompactionTask, MultiTableCommittable> {

private static final Logger LOG =
LoggerFactory.getLogger(UnawareCombinedCompactionWorkerOperator.class);
LoggerFactory.getLogger(CombinedUmawareCompactionWorkerOperator.class);

private final String commitUser;
private final Catalog.Loader catalogLoader;
Expand All @@ -66,7 +68,7 @@ public class UnawareCombinedCompactionWorkerOperator

private transient Catalog catalog;

public UnawareCombinedCompactionWorkerOperator(
public CombinedUmawareCompactionWorkerOperator(
Catalog.Loader catalogLoader, String commitUser, Options options) {
super(options);
this.commitUser = commitUser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
* Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
* BucketUnawareCompactSource} for compacting multi unaware bucket tables in divided mode.
*/
public class UnawareDividedCompactionWorkerOperator
public class DividedUnawareCompactionWorkerOperator
extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {

private static final Logger LOG =
LoggerFactory.getLogger(UnawareDividedCompactionWorkerOperator.class);
LoggerFactory.getLogger(DividedUnawareCompactionWorkerOperator.class);

private final FileStoreTable table;
private final String commitUser;
Expand All @@ -55,7 +55,7 @@ public class UnawareDividedCompactionWorkerOperator

private transient ExecutorService lazyCompactExecutor;

public UnawareDividedCompactionWorkerOperator(FileStoreTable table, String commitUser) {
public DividedUnawareCompactionWorkerOperator(FileStoreTable table, String commitUser) {
super(Options.fromMap(table.options()));
this.table = table;
this.commitUser = commitUser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static DataStreamSink<?> sink(
@Override
protected OneInputStreamOperator<AppendOnlyCompactionTask, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new UnawareDividedCompactionWorkerOperator(table, commitUser);
return new DividedUnawareCompactionWorkerOperator(table, commitUser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.operator.BatchMultiSourceFunction;
import org.apache.paimon.flink.source.operator.BatchUnawareSourceFunction;
import org.apache.paimon.flink.source.operator.StreamingMultiSourceFunction;
import org.apache.paimon.flink.source.operator.StreamingUnawareSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedBatchMultiSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedBatchUnawareSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedStreamingMultiSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedStreamingUnawareSourceFunction;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -77,7 +77,7 @@ public DataStream<RowData> buildForMultiBucketTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
RowType produceType = BucketsTable.getRowType();
if (isContinuous) {
return StreamingMultiSourceFunction.buildSource(
return CombinedStreamingMultiSourceFunction.buildSource(
env,
"Combine-MultiBucketTables--StreamingCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
Expand All @@ -87,7 +87,7 @@ public DataStream<RowData> buildForMultiBucketTableSource() {
databasePattern,
monitorInterval);
} else {
return BatchMultiSourceFunction.buildSource(
return CombinedBatchMultiSourceFunction.buildSource(
env,
"Combine-MultiBucketTables-BatchCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
Expand All @@ -102,7 +102,7 @@ public DataStream<RowData> buildForMultiBucketTableSource() {
public DataStream<AppendOnlyCompactionTask> buildForUnawareBucketsTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
if (isContinuous) {
return StreamingUnawareSourceFunction.buildSource(
return CombinedStreamingUnawareSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-StreamingCompactorSource",
catalogLoader,
Expand All @@ -111,7 +111,7 @@ public DataStream<AppendOnlyCompactionTask> buildForUnawareBucketsTableSource()
databasePattern,
monitorInterval);
} else {
return BatchUnawareSourceFunction.buildSource(
return CombinedBatchUnawareSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-BatchCompactorSource",
catalogLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
import java.util.regex.Pattern;

/** It is responsible for monitoring compactor source of multi bucket table in batch mode. */
public class BatchMultiSourceFunction
extends CombineModeCompactorSourceFunction<Tuple2<Split, String>> {
public class CombinedBatchMultiSourceFunction
extends CombinedCompactorSourceFunction<Tuple2<Split, String>> {

public BatchMultiSourceFunction(
public CombinedBatchMultiSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Expand Down Expand Up @@ -83,8 +83,8 @@ public static DataStream<RowData> buildSource(
Pattern excludingPattern,
Pattern databasePattern,
long monitorInterval) {
BatchMultiSourceFunction function =
new BatchMultiSourceFunction(
CombinedBatchMultiSourceFunction function =
new CombinedBatchMultiSourceFunction(
catalogLoader,
includingPattern,
excludingPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
* It is responsible for the batch compactor source of the table with unaware bucket in combined
* mode.
*/
public class BatchUnawareSourceFunction
extends CombineModeCompactorSourceFunction<AppendOnlyCompactionTask> {
public BatchUnawareSourceFunction(
public class CombinedBatchUnawareSourceFunction
extends CombinedCompactorSourceFunction<AppendOnlyCompactionTask> {
public CombinedBatchUnawareSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Expand Down Expand Up @@ -80,14 +80,14 @@ public static DataStream<AppendOnlyCompactionTask> buildSource(
Pattern excludingPattern,
Pattern databasePattern,
long monitorInterval) {
BatchUnawareSourceFunction function =
new BatchUnawareSourceFunction(
CombinedBatchUnawareSourceFunction function =
new CombinedBatchUnawareSourceFunction(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
StreamSource<AppendOnlyCompactionTask, BatchUnawareSourceFunction> sourceOperator =
StreamSource<AppendOnlyCompactionTask, CombinedBatchUnawareSourceFunction> sourceOperator =
new StreamSource<>(function);
CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo();
SingleOutputStreamOperator<AppendOnlyCompactionTask> source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* <p>Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the
* single (non-parallel) monitoring task, it is responsible for the new Paimon table.
*/
public abstract class CombineModeCompactorSourceFunction<T> extends RichSourceFunction<T> {
public abstract class CombinedCompactorSourceFunction<T> extends RichSourceFunction<T> {

private static final long serialVersionUID = 2L;

Expand All @@ -62,7 +62,7 @@ public abstract class CombineModeCompactorSourceFunction<T> extends RichSourceFu

protected transient CompactionFileScanner<T> compactionFileScanner;

public CombineModeCompactorSourceFunction(
public CombinedCompactorSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
import java.util.regex.Pattern;

/** It is responsible for monitoring compactor source of multi bucket table in stream mode. */
public class StreamingMultiSourceFunction
extends CombineModeCompactorSourceFunction<Tuple2<Split, String>> {
public class CombinedStreamingMultiSourceFunction
extends CombinedCompactorSourceFunction<Tuple2<Split, String>> {

public StreamingMultiSourceFunction(
public CombinedStreamingMultiSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Expand Down Expand Up @@ -85,8 +85,8 @@ public static DataStream<RowData> buildSource(
Pattern databasePattern,
long monitorInterval) {

StreamingMultiSourceFunction function =
new StreamingMultiSourceFunction(
CombinedStreamingMultiSourceFunction function =
new CombinedStreamingMultiSourceFunction(
catalogLoader,
includingPattern,
excludingPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
/**
* It is responsible for monitoring compactor source in stream mode for the table of unaware bucket.
*/
public class StreamingUnawareSourceFunction
extends CombineModeCompactorSourceFunction<AppendOnlyCompactionTask> {
public StreamingUnawareSourceFunction(
public class CombinedStreamingUnawareSourceFunction
extends CombinedCompactorSourceFunction<AppendOnlyCompactionTask> {
public CombinedStreamingUnawareSourceFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Expand Down Expand Up @@ -79,15 +79,15 @@ public static DataStream<AppendOnlyCompactionTask> buildSource(
Pattern databasePattern,
long monitorInterval) {

StreamingUnawareSourceFunction function =
new StreamingUnawareSourceFunction(
CombinedStreamingUnawareSourceFunction function =
new CombinedStreamingUnawareSourceFunction(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
StreamSource<AppendOnlyCompactionTask, StreamingUnawareSourceFunction> sourceOperator =
new StreamSource<>(function);
StreamSource<AppendOnlyCompactionTask, CombinedStreamingUnawareSourceFunction>
sourceOperator = new StreamSource<>(function);
boolean isParallel = false;
CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo();
return new DataStreamSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@

/**
* The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link
* BatchMultiSourceFunction} or {@link StreamingMultiSourceFunction}. Contrary to the {@link
* CombineModeCompactorSourceFunction} which has a parallelism of 1, this operator can have DOP > 1.
* CombinedBatchMultiSourceFunction} or {@link CombinedStreamingMultiSourceFunction}. Contrary to
* the {@link CombinedCompactorSourceFunction} which has a parallelism of 1, this operator can have
* DOP > 1.
*/
public class MultiTablesReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Tuple2<Split, String>, RowData> {
Expand Down
Loading

0 comments on commit c4ef352

Please sign in to comment.