From 9c959d0b06349e0833c43d81d95b8f6b31173795 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Tue, 26 Nov 2024 11:44:46 +0800 Subject: [PATCH] [flink] fix --- .../paimon/flink/sink/MultiTablesStoreCompactOperator.java | 2 +- .../apache/paimon/flink/action/MinorCompactActionITCase.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index e59b3470545bf..801a44300e8c8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -162,13 +162,13 @@ public void processElement(StreamRecord element) throws Exception { if (write.streamingMode()) { write.notifyNewFiles(snapshotId, partition, bucket, files); + // The full compact is not supported in streaming mode. write.compact(partition, bucket, false); } else { Preconditions.checkArgument( files.isEmpty(), "Batch compact job does not concern what files are compacted. " + "They only need to know what buckets are compacted."); - // `minor` compact strategy is supported in batch mode. write.compact(partition, bucket, fullCompaction); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java index 5a32597fcf1ae..8e15a6eb4da75 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java @@ -172,6 +172,6 @@ public void testStreamingFullCompactStrategy() throws Exception { streamExecutionEnvironmentBuilder().streamingMode().build(); Assertions.assertThatThrownBy(() -> action.withStreamExecutionEnvironment(env).build()) .hasMessage( - "full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); + "The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); } }