diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java index d10c5c11cb6a..ee44a72d4114 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java @@ -71,14 +71,14 @@ public void withPartitionPredicate(Predicate predicate) { public void build() { // build source from UnawareSourceFunction - DataStreamSource source = buildSource(); + DataStreamSource source = buildSource(false); // from source, construct the full flink job sinkFromSource(source); } public DataStream fetchUncommitted(String commitUser) { - DataStreamSource source = buildSource(); + DataStreamSource source = buildSource(true); // rebalance input to default or assigned parallelism DataStream rebalanced = rebalanceInput(source); @@ -87,11 +87,11 @@ public DataStream fetchUncommitted(String commitUser) { .doWrite(rebalanced, commitUser, rebalanced.getParallelism()); } - private DataStreamSource buildSource() { + private DataStreamSource buildSource(boolean emitMaxWatermark) { long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis(); BucketUnawareCompactSource source = new BucketUnawareCompactSource( - table, isContinuous, scanInterval, partitionPredicate); + table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark); return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java index 7926fa60a566..936b2cd2c70c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends RichSourceFunction ctx; private volatile boolean isRunning = true; + private final boolean emitMaxWatermark; public BucketUnawareCompactSource( FileStoreTable table, boolean isStreaming, long scanInterval, - @Nullable Predicate filter) { + @Nullable Predicate filter, + boolean emitMaxWatermark) { this.table = table; this.streaming = isStreaming; this.scanInterval = scanInterval; this.filter = filter; + this.emitMaxWatermark = emitMaxWatermark; } @Override @@ -94,6 +98,10 @@ public void run(SourceContext sourceContext) throws Ex List tasks = compactionCoordinator.run(); isEmpty = tasks.isEmpty(); tasks.forEach(ctx::collect); + + if (emitMaxWatermark) { + ctx.emitWatermark(Watermark.MAX_WATERMARK); + } } catch (EndOfScanException esf) { LOG.info("Catching EndOfStreamException, the stream is finished."); return; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java index 85bee4bb55a4..55bd89b0189c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.File; @@ -208,6 +209,35 @@ public void testCompactionInStreamingMode() throws Exception { assertThat(rows.size()).isEqualTo(10); } + @Test + public void testCompactionInStreamingModeWithMaxWatermark() throws Exception { + batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')"); + batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')"); + + sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500)); + sEnv.executeSql( + "CREATE TEMPORARY TABLE Orders_in (\n" + + " f0 INT,\n" + + " f1 STRING,\n" + + " ts TIMESTAMP(3),\n" + + "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '1',\n" + + " 'number-of-rows' = '10'\n" + + ")"); + + assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM Orders_in", 60000); + // ensure data gen finished + Thread.sleep(5000); + + Snapshot snapshot = findLatestSnapshot("append_table"); + Assertions.assertNotNull(snapshot); + Long watermark = snapshot.watermark(); + Assertions.assertNotNull(watermark); + Assertions.assertTrue(watermark > Long.MIN_VALUE); + } + @Test public void testRejectDelete() { testRejectChanges(RowKind.DELETE);