From 898c0ea69686b1cffaee7ee799e1f9d3bb0e043d Mon Sep 17 00:00:00 2001 From: Aitozi <1059789585@qq.com> Date: Tue, 23 Jan 2024 13:22:42 +0800 Subject: [PATCH] [Bug] fix sort compaction failed on empty input (#2766) --- .../paimon/flink/shuffle/RangeShuffle.java | 6 ++++-- .../SortCompactActionForUnawareBucketITCase.java | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java index 4fb5f9217537..b51c1146142a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java @@ -54,6 +54,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -257,9 +258,10 @@ public void endInput() throws Exception { T record = sampledData.get((int) (i * avgRange)); boundaries[i - 1] = record; } + collector.collect(Arrays.asList(boundaries)); + } else { + collector.collect(Collections.emptyList()); } - - collector.collect(Arrays.asList(boundaries)); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index 752977b0f7ca..dc9c568798d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -258,6 +258,22 @@ public void testRandomSuffixWorks() throws Exception { Assertions.assertThat(files.size()).isEqualTo(3); } + @Test + public void testSortCompactionOnEmptyData() throws Exception { + createTable(); + SortCompactAction sortCompactAction = + new SortCompactAction( + warehouse, + database, + tableName, + Collections.emptyMap(), + Collections.emptyMap()) + .withOrderStrategy("zorder") + .withOrderColumns(Collections.singletonList("f0")); + + sortCompactAction.run(); + } + private void zorder(List columns) throws Exception { if (RANDOM.nextBoolean()) { createAction("zorder", columns).run();