buildSource(boolean emitMaxWatermark) {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
- table, isContinuous, scanInterval, partitionPredicate);
+ table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark);
return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index f253a3bf8e79b..f12f7cbd88f8c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -51,7 +51,7 @@
* A dedicated operator for manual triggered compaction.
*
* In-coming records are generated by sources built from {@link
- * org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain
+ * org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
@@ -173,7 +173,6 @@ public void processElement(StreamRecord element) throws Exception {
@Override
protected List prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {
-
List committables = new LinkedList<>();
for (Map.Entry entry : writes.entrySet()) {
Identifier key = entry.getKey();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 7926fa60a566c..936b2cd2c70cf 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -32,6 +32,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends RichSourceFunction ctx;
private volatile boolean isRunning = true;
+ private final boolean emitMaxWatermark;
public BucketUnawareCompactSource(
FileStoreTable table,
boolean isStreaming,
long scanInterval,
- @Nullable Predicate filter) {
+ @Nullable Predicate filter,
+ boolean emitMaxWatermark) {
this.table = table;
this.streaming = isStreaming;
this.scanInterval = scanInterval;
this.filter = filter;
+ this.emitMaxWatermark = emitMaxWatermark;
}
@Override
@@ -94,6 +98,10 @@ public void run(SourceContext sourceContext) throws Ex
List tasks = compactionCoordinator.run();
isEmpty = tasks.isEmpty();
tasks.forEach(ctx::collect);
+
+ if (emitMaxWatermark) {
+ ctx.emitWatermark(Watermark.MAX_WATERMARK);
+ }
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is finished.");
return;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index 6ee9d849af897..e5cb7d971f769 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -37,10 +37,11 @@
import java.util.regex.Pattern;
/**
- * source builder to build a Flink compactor source for multi-tables. This is for dedicated
- * compactor jobs in combined mode.
+ * Source builder to build a Flink compactor source for multi-tables. This is for dedicated
+ * compactor jobs with combined mode.
*/
public class CombinedTableCompactorSourceBuilder {
+
private final Catalog.Loader catalogLoader;
private final Pattern includingPattern;
private final Pattern excludingPattern;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index 22ef330291ff7..e8a91b5415caf 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -84,9 +84,9 @@ void scanTable() throws Exception {
if (scanResult == IS_EMPTY) {
// Currently, in the combined mode, there are two scan tasks for the table of two
// different bucket type (multi bucket & unaware bucket) running concurrently.
- // There will be a situation that there is only one task compaction , therefore this
+ // There will be a situation that there is only one task compaction, therefore this
// should not be thrown exception here.
- LOGGER.info("No file were collected for the table of aware-bucket");
+ LOGGER.info("No file were collected for the table of aware-bucket.");
}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index bff690ea30c23..8d4107a0da8e3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -95,7 +95,6 @@ public static DataStream buildSource(
Pattern excludingPattern,
Pattern databasePattern,
long monitorInterval) {
-
CombinedAwareStreamingSourceFunction function =
new CombinedAwareStreamingSourceFunction(
catalogLoader,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 85bee4bb55a48..55bd89b0189cc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -25,6 +25,7 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -208,6 +209,35 @@ public void testCompactionInStreamingMode() throws Exception {
assertThat(rows.size()).isEqualTo(10);
}
+ @Test
+ public void testCompactionInStreamingModeWithMaxWatermark() throws Exception {
+ batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
+ batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
+
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE Orders_in (\n"
+ + " f0 INT,\n"
+ + " f1 STRING,\n"
+ + " ts TIMESTAMP(3),\n"
+ + "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'number-of-rows' = '10'\n"
+ + ")");
+
+ assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM Orders_in", 60000);
+ // ensure data gen finished
+ Thread.sleep(5000);
+
+ Snapshot snapshot = findLatestSnapshot("append_table");
+ Assertions.assertNotNull(snapshot);
+ Long watermark = snapshot.watermark();
+ Assertions.assertNotNull(watermark);
+ Assertions.assertTrue(watermark > Long.MIN_VALUE);
+ }
+
@Test
public void testRejectDelete() {
testRejectChanges(RowKind.DELETE);