diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 8ea120015609..ce88857f1b14 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -138,7 +138,8 @@ private void buildForTraditionalCompaction( } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(identifier.getFullName(), table); - CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table); + CompactorSinkBuilder sinkBuilder = + new CompactorSinkBuilder(table).withFullCompaction(!isStreaming); sourceBuilder.withPartitionPredicate(getPredicate()); DataStreamSource source = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index fda9ff695e1e..471c6fdd4da6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -259,7 +259,8 @@ private void buildForTraditionalCompaction( CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table) .withPartitionIdleTime(partitionIdleTime); - CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table); + CompactorSinkBuilder sinkBuilder = + new CompactorSinkBuilder(table).withFullCompaction(!isStreaming); DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index 7dc3ab1150b0..a0c830d73f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -29,14 +29,17 @@ public class CompactorSink extends FlinkSink { private static final long serialVersionUID = 1L; - public CompactorSink(FileStoreTable table) { + private final boolean fullCompaction; + + public CompactorSink(FileStoreTable table, boolean fullCompaction) { super(table, false); + this.fullCompaction = fullCompaction; } @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new StoreCompactOperator(table, writeProvider, commitUser); + return new StoreCompactOperator(table, writeProvider, commitUser, fullCompaction); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java index 926155cabf29..2173b1d34a72 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java @@ -37,6 +37,8 @@ public class CompactorSinkBuilder { private DataStream input; + private boolean fullCompaction; + public CompactorSinkBuilder(FileStoreTable table) { this.table = table; } @@ -46,6 +48,11 @@ public CompactorSinkBuilder withInput(DataStream input) { return this; } + public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) { + this.fullCompaction = fullCompaction; + return this; + } + public DataStreamSink build() { BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { @@ -66,6 +73,6 @@ private DataStreamSink buildForBucketAware() { .orElse(null); DataStream partitioned = partition(input, new BucketsRowChannelComputer(), parallelism); - return new CompactorSink(table).sinkFrom(partitioned); + return new CompactorSink(table, fullCompaction).sinkFrom(partitioned); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index bc7bb350df21..9b152a81ca22 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -52,6 +52,7 @@ public class StoreCompactOperator extends PrepareCommitOperator prepareCommit(boolean waitCompaction, long checkpoin try { for (Pair partitionBucket : waitToCompact) { - write.compact( - partitionBucket.getKey(), - partitionBucket.getRight(), - !write.streamingMode()); + write.compact(partitionBucket.getKey(), partitionBucket.getRight(), fullCompaction); } } catch (Exception e) { throw new RuntimeException("Exception happens while executing compaction.", e); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index a5f260fb25a5..c38ac4b3d685 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -132,7 +132,7 @@ public void testCompact() throws Exception { .withContinuousMode(false) .withPartitionPredicate(predicate) .build(); - new CompactorSinkBuilder(table).withInput(source).build(); + new CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build(); env.execute(); snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); @@ -182,6 +182,7 @@ public void testCompactParallelism() throws Exception { String.valueOf(sinkParalellism)); } })) + .withFullCompaction(false) .withInput(source) .build(); @@ -267,7 +268,8 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) { false, memoryPool, metricGroup), - "test"); + "test", + true); } protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 3f2daedffd48..f8387e1fc41a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -53,7 +53,8 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { getTableDefault(), (table, commitUser, state, ioManager, memoryPool, metricGroup) -> compactRememberStoreWrite, - "10086"); + "10086", + !streamingMode); TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());