diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 27442e1f82124..7b878c78cec4f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -29,8 +29,6 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder; import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder; import org.apache.paimon.flink.source.CompactorSourceBuilder; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; -import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractBucketScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractBucketScanLogic.java index 00e25eb3c2ac9..ff1290f9c8126 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractBucketScanLogic.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractBucketScanLogic.java @@ -85,7 +85,7 @@ protected void updateTableMap() for (String tableName : tables) { Identifier identifier = Identifier.create(databaseName, tableName); if (shouldCompactTable(identifier, includingPattern, excludingPattern) - && (!tableScanned(identifier))) { + && (!checkTableScanned(identifier))) { Table table = catalog.getTable(identifier); if (!(table instanceof FileStoreTable)) { LOG.error( @@ -107,7 +107,7 @@ abstract Boolean collectFiles(SourceFunction.SourceContext ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException; /** Check if table has been scanned. */ - abstract boolean tableScanned(Identifier identifier); + abstract boolean checkTableScanned(Identifier identifier); /** Add the scan table to the table map. */ abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java index 0974747f4ad65..640806730b312 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java @@ -44,8 +44,7 @@ public void scan(SourceFunction.SourceContext ctx) throws Exception { } if (isEmpty) { // Currently, in the combined mode, there are two scan tasks for the table of two - // different bucket type (multi bucket & unaware bucket) are - // running concurrently. + // different bucket type (multi bucket & unaware bucket) running concurrently. // There will be a situation that there is only one task compaction , therefore this // should not be thrown exception here. LOGGER.info( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketScanLogic.java index 152b9f909e2a8..7cdc36a98d425 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketScanLogic.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketScanLogic.java @@ -82,7 +82,6 @@ public Boolean collectFiles(SourceFunction.SourceContext> updateTableMap(); try { - // batch mode do not need check for new tables List> splits = new ArrayList<>(); for (Map.Entry entry : scansMap.entrySet()) { Identifier identifier = entry.getKey(); @@ -103,24 +102,19 @@ public Boolean collectFiles(SourceFunction.SourceContext> } @Override - public boolean tableScanned(Identifier identifier) { + public boolean checkTableScanned(Identifier identifier) { return tablesMap.containsKey(identifier); } @Override public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { - if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { - LOG.info( - String.format("the bucket mode of %s is unware. ", identifier.getFullName()) - + "currently, the table with unware bucket mode is not support in combined mode."); - return; + if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) { + BucketsTable bucketsTable = + new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) + .copy(compactOptions(isStreaming)); + tablesMap.put(identifier, bucketsTable); + scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); } - - BucketsTable bucketsTable = - new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) - .copy(compactOptions(isStreaming)); - tablesMap.put(identifier, bucketsTable); - scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketScanLogic.java index 4087b374cfeec..63bb3f91ca1c5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketScanLogic.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketScanLogic.java @@ -98,7 +98,7 @@ public Boolean collectFiles(SourceFunction.SourceContext doWrite( .transform( String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), - createWriteOperator( + CombinedMultiComacptionWriteOperator( env.getCheckpointConfig(), isStreaming, commitUser)) .setParallelism(multiBucketTableSource.getParallelism()); @@ -111,7 +111,7 @@ public DataStream doWrite( .transform( String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), - new UnawareCombinedCompactionWorkerOperator( + new CombinedUmawareCompactionWorkerOperator( catalogLoader, commitUser, options)) .setParallelism(unawareBucketTableSource.getParallelism()); @@ -181,8 +181,9 @@ private void assertBatchConfiguration(StreamExecutionEnvironment env, int sinkPa } // TODO:refactor FlinkSink to adopt this sink - protected OneInputStreamOperator createWriteOperator( - CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) { + protected OneInputStreamOperator + CombinedMultiComacptionWriteOperator( + CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) { return new MultiTablesStoreCompactOperator( catalogLoader, commitUser, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombinedCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedUmawareCompactionWorkerOperator.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombinedCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedUmawareCompactionWorkerOperator.java index 1d3cad50a1fc6..00f5a55ab03ac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombinedCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedUmawareCompactionWorkerOperator.java @@ -23,7 +23,8 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.compact.UnwareBucketCompactionHelper; -import org.apache.paimon.flink.source.BucketUnawareCompactSource; +import org.apache.paimon.flink.source.operator.CombinedBatchUnawareSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedStreamingUnawareSourceFunction; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; @@ -48,13 +49,14 @@ /** * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link - * BucketUnawareCompactSource} for support compacting multi unaware bucket tables in combined mode. + * CombinedStreamingUnawareSourceFunction} or{@link CombinedBatchUnawareSourceFunction} for support + * compacting multi unaware bucket tables in combined mode. */ -public class UnawareCombinedCompactionWorkerOperator +public class CombinedUmawareCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(UnawareCombinedCompactionWorkerOperator.class); + LoggerFactory.getLogger(CombinedUmawareCompactionWorkerOperator.class); private final String commitUser; private final Catalog.Loader catalogLoader; @@ -66,7 +68,7 @@ public class UnawareCombinedCompactionWorkerOperator private transient Catalog catalog; - public UnawareCombinedCompactionWorkerOperator( + public CombinedUmawareCompactionWorkerOperator( Catalog.Loader catalogLoader, String commitUser, Options options) { super(options); this.commitUser = commitUser; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DividedUnawareCompactionWorkerOperator.java similarity index 95% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DividedUnawareCompactionWorkerOperator.java index 9a1d64af23f34..e07bdc708b29a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DividedUnawareCompactionWorkerOperator.java @@ -42,11 +42,11 @@ * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link * BucketUnawareCompactSource} for compacting multi unaware bucket tables in divided mode. */ -public class UnawareDividedCompactionWorkerOperator +public class DividedUnawareCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(UnawareDividedCompactionWorkerOperator.class); + LoggerFactory.getLogger(DividedUnawareCompactionWorkerOperator.class); private final FileStoreTable table; private final String commitUser; @@ -55,7 +55,7 @@ public class UnawareDividedCompactionWorkerOperator private transient ExecutorService lazyCompactExecutor; - public UnawareDividedCompactionWorkerOperator(FileStoreTable table, String commitUser) { + public DividedUnawareCompactionWorkerOperator(FileStoreTable table, String commitUser) { super(Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index 66cf996de8de7..6b4cd434d61c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -44,7 +44,7 @@ public static DataStreamSink sink( @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new UnawareDividedCompactionWorkerOperator(table, commitUser); + return new DividedUnawareCompactionWorkerOperator(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java index c90673b83fd22..c0d02e2fca8ff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java @@ -21,10 +21,10 @@ import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.LogicalTypeConversion; -import org.apache.paimon.flink.source.operator.BatchMultiSourceFunction; -import org.apache.paimon.flink.source.operator.BatchUnawareSourceFunction; -import org.apache.paimon.flink.source.operator.StreamingMultiSourceFunction; -import org.apache.paimon.flink.source.operator.StreamingUnawareSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedBatchMultiSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedBatchUnawareSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedStreamingMultiSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedStreamingUnawareSourceFunction; import org.apache.paimon.table.system.BucketsTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; @@ -77,7 +77,7 @@ public DataStream buildForMultiBucketTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); RowType produceType = BucketsTable.getRowType(); if (isContinuous) { - return StreamingMultiSourceFunction.buildSource( + return CombinedStreamingMultiSourceFunction.buildSource( env, "Combine-MultiBucketTables--StreamingCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), @@ -87,7 +87,7 @@ public DataStream buildForMultiBucketTableSource() { databasePattern, monitorInterval); } else { - return BatchMultiSourceFunction.buildSource( + return CombinedBatchMultiSourceFunction.buildSource( env, "Combine-MultiBucketTables-BatchCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), @@ -102,7 +102,7 @@ public DataStream buildForMultiBucketTableSource() { public DataStream buildForUnawareBucketsTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); if (isContinuous) { - return StreamingUnawareSourceFunction.buildSource( + return CombinedStreamingUnawareSourceFunction.buildSource( env, "Combined-UnawareBucketTables-StreamingCompactorSource", catalogLoader, @@ -111,7 +111,7 @@ public DataStream buildForUnawareBucketsTableSource() databasePattern, monitorInterval); } else { - return BatchUnawareSourceFunction.buildSource( + return CombinedBatchUnawareSourceFunction.buildSource( env, "Combined-UnawareBucketTables-BatchCompactorSource", catalogLoader, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java similarity index 94% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java index 8274e59d28fda..7794c2a864289 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java @@ -41,10 +41,10 @@ import java.util.regex.Pattern; /** It is responsible for monitoring compactor source of multi bucket table in batch mode. */ -public class BatchMultiSourceFunction - extends CombineModeCompactorSourceFunction> { +public class CombinedBatchMultiSourceFunction + extends CombinedCompactorSourceFunction> { - public BatchMultiSourceFunction( + public CombinedBatchMultiSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -83,8 +83,8 @@ public static DataStream buildSource( Pattern excludingPattern, Pattern databasePattern, long monitorInterval) { - BatchMultiSourceFunction function = - new BatchMultiSourceFunction( + CombinedBatchMultiSourceFunction function = + new CombinedBatchMultiSourceFunction( catalogLoader, includingPattern, excludingPattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java similarity index 91% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java index 4c62894b2ca76..c85092f0fd772 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java @@ -41,9 +41,9 @@ * It is responsible for the batch compactor source of the table with unaware bucket in combined * mode. */ -public class BatchUnawareSourceFunction - extends CombineModeCompactorSourceFunction { - public BatchUnawareSourceFunction( +public class CombinedBatchUnawareSourceFunction + extends CombinedCompactorSourceFunction { + public CombinedBatchUnawareSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -80,14 +80,14 @@ public static DataStream buildSource( Pattern excludingPattern, Pattern databasePattern, long monitorInterval) { - BatchUnawareSourceFunction function = - new BatchUnawareSourceFunction( + CombinedBatchUnawareSourceFunction function = + new CombinedBatchUnawareSourceFunction( catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval); - StreamSource sourceOperator = + StreamSource sourceOperator = new StreamSource<>(function); CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); SingleOutputStreamOperator source = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java index 819ef3cf9c435..c1026a7198daf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java @@ -45,7 +45,7 @@ *

Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the * single (non-parallel) monitoring task, it is responsible for the new Paimon table. */ -public abstract class CombineModeCompactorSourceFunction extends RichSourceFunction { +public abstract class CombinedCompactorSourceFunction extends RichSourceFunction { private static final long serialVersionUID = 2L; @@ -62,7 +62,7 @@ public abstract class CombineModeCompactorSourceFunction extends RichSourceFu protected transient CompactionFileScanner compactionFileScanner; - public CombineModeCompactorSourceFunction( + public CombinedCompactorSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java similarity index 94% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java index 2efd122f1dd5e..49cc776b1bcfa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java @@ -41,10 +41,10 @@ import java.util.regex.Pattern; /** It is responsible for monitoring compactor source of multi bucket table in stream mode. */ -public class StreamingMultiSourceFunction - extends CombineModeCompactorSourceFunction> { +public class CombinedStreamingMultiSourceFunction + extends CombinedCompactorSourceFunction> { - public StreamingMultiSourceFunction( + public CombinedStreamingMultiSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -85,8 +85,8 @@ public static DataStream buildSource( Pattern databasePattern, long monitorInterval) { - StreamingMultiSourceFunction function = - new StreamingMultiSourceFunction( + CombinedStreamingMultiSourceFunction function = + new CombinedStreamingMultiSourceFunction( catalogLoader, includingPattern, excludingPattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java similarity index 89% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java index d1969baf0218e..dbd0c70a30237 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java @@ -37,9 +37,9 @@ /** * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. */ -public class StreamingUnawareSourceFunction - extends CombineModeCompactorSourceFunction { - public StreamingUnawareSourceFunction( +public class CombinedStreamingUnawareSourceFunction + extends CombinedCompactorSourceFunction { + public CombinedStreamingUnawareSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -79,15 +79,15 @@ public static DataStream buildSource( Pattern databasePattern, long monitorInterval) { - StreamingUnawareSourceFunction function = - new StreamingUnawareSourceFunction( + CombinedStreamingUnawareSourceFunction function = + new CombinedStreamingUnawareSourceFunction( catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval); - StreamSource sourceOperator = - new StreamSource<>(function); + StreamSource + sourceOperator = new StreamSource<>(function); boolean isParallel = false; CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); return new DataStreamSource<>( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index 93de286ef85b1..5cb6b17fe2668 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -44,8 +44,9 @@ /** * The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link - * BatchMultiSourceFunction} or {@link StreamingMultiSourceFunction}. Contrary to the {@link - * CombineModeCompactorSourceFunction} which has a parallelism of 1, this operator can have DOP > 1. + * CombinedBatchMultiSourceFunction} or {@link CombinedStreamingMultiSourceFunction}. Contrary to + * the {@link CombinedCompactorSourceFunction} which has a parallelism of 1, this operator can have + * DOP > 1. */ public class MultiTablesReadOperator extends AbstractStreamOperator implements OneInputStreamOperator, RowData> { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index c207dbff4bd75..50ea4add1c952 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -90,6 +90,101 @@ private FileStoreTable createTable( return (FileStoreTable) catalog.getTable(identifier); } + @ParameterizedTest(name = "mode = {0}") + @ValueSource(strings = {"combined", "divided"}) + @Timeout(6000) + public void testStreamCompactForUnawareTable(String mode) throws Exception { + + // step0. create tables + Map tableToCompaction = new HashMap<>(); + for (String dbName : DATABASE_NAMES) { + for (String tableName : TABLE_NAMES) { + Map option = new HashMap<>(); + option.put(CoreOptions.WRITE_ONLY.key(), "true"); + List keys; + if (tableName.endsWith("unaware_bucket")) { + option.put("bucket", "-1"); + option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + option.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + keys = Lists.newArrayList(); + FileStoreTable table = + createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option); + tableToCompaction.put(Identifier.create(dbName, tableName), table); + } + } + } + + // step1. run streaming compaction task for tables + if (ThreadLocalRandom.current().nextBoolean()) { + StreamExecutionEnvironment env = buildDefaultEnv(true); + createAction( + CompactDatabaseAction.class, + "compact_database", + "--warehouse", + warehouse, + "--mode", + mode) + .withStreamExecutionEnvironment(env) + .build(); + env.executeAsync(); + } else { + callProcedure(String.format("CALL sys.compact_database('', '%s')", mode), true, false); + } + + // step3. write datas to table wait for compaction + for (Map.Entry identifierFileStoreTableEntry : + tableToCompaction.entrySet()) { + FileStoreTable table = identifierFileStoreTableEntry.getValue(); + SnapshotManager snapshotManager = table.snapshotManager(); + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + + writeData( + rowData(1, 100, 15, BinaryString.fromString("20221208")), + rowData(1, 100, 16, BinaryString.fromString("20221208")), + rowData(1, 100, 15, BinaryString.fromString("20221209"))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString("20221208")), + rowData(2, 100, 16, BinaryString.fromString("20221208")), + rowData(2, 100, 15, BinaryString.fromString("20221209"))); + + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(2); + assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); + } + + for (Map.Entry identifierFileStoreTableEntry : + tableToCompaction.entrySet()) { + FileStoreTable table = identifierFileStoreTableEntry.getValue(); + SnapshotManager snapshotManager = table.snapshotManager(); + while (true) { + if (snapshotManager.latestSnapshotId() == 2) { + Thread.sleep(1000); + } else { + validateResult( + table, + ROW_TYPE, + table.newReadBuilder().newStreamScan(), + Arrays.asList( + "+I[1, 100, 15, 20221208]", + "+I[1, 100, 15, 20221209]", + "+I[1, 100, 16, 20221208]", + "+I[2, 100, 15, 20221208]", + "+I[2, 100, 15, 20221209]", + "+I[2, 100, 16, 20221208]"), + 60_000); + break; + } + TableScan.Plan plan = table.newScan().plan(); + } + } + } + @ParameterizedTest(name = "mode = {0}") @ValueSource(strings = {"divided", "combined"}) @Timeout(60) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java index b222332af4261..baf69808be3f5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java @@ -42,14 +42,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -/** Tests for {@link UnawareDividedCompactionWorkerOperator}. */ +/** Tests for {@link DividedUnawareCompactionWorkerOperator}. */ public class UnwaredSingleCompactionWorkerOperatorTest extends TableTestBase { @Test public void testAsyncCompactionWorks() throws Exception { createTableDefault(); - UnawareDividedCompactionWorkerOperator workerOperator = - new UnawareDividedCompactionWorkerOperator(getTableDefault(), "user"); + DividedUnawareCompactionWorkerOperator workerOperator = + new DividedUnawareCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -79,7 +79,7 @@ public void testAsyncCompactionWorks() throws Exception { if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " - + UnawareDividedCompactionWorkerOperator.class + + DividedUnawareCompactionWorkerOperator.class .getName()); } Thread.sleep(1_000L); @@ -100,8 +100,8 @@ public void testAsyncCompactionWorks() throws Exception { @Test public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); - UnawareDividedCompactionWorkerOperator workerOperator = - new UnawareDividedCompactionWorkerOperator(getTableDefault(), "user"); + DividedUnawareCompactionWorkerOperator workerOperator = + new DividedUnawareCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 40);