From 885c2e0919c7b3afa28769dca037ea3e8fd75aad Mon Sep 17 00:00:00 2001
From: Wencong Liu <104502720+WencongLiu@users.noreply.github.com>
Date: Fri, 31 May 2024 14:15:28 +0800
Subject: [PATCH] [flink] Introduce Range Partition And Sort in Append Scalable
Table Batch Writing for Flink (#3384)
---
docs/content/flink/sql-write.md | 32 +++
.../flink_connector_configuration.html | 24 ++
.../paimon/flink/FlinkConnectorOptions.java | 37 +++
.../flink/action/SortCompactAction.java | 30 +-
.../paimon/flink/shuffle/RangeShuffle.java | 22 +-
.../paimon/flink/sink/FlinkSinkBuilder.java | 110 +++++++
.../paimon/flink/sink/FlinkTableSinkBase.java | 11 +-
.../paimon/flink/sorter/HilbertSorter.java | 11 +-
.../paimon/flink/sorter/OrderSorter.java | 12 +-
.../apache/paimon/flink/sorter/SortUtils.java | 148 +++++-----
.../paimon/flink/sorter/TableSortInfo.java | 168 +++++++++++
.../paimon/flink/sorter/TableSorter.java | 15 +-
.../paimon/flink/sorter/ZorderSorter.java | 11 +-
...ionAndSortForUnawareBucketTableITCase.java | 270 ++++++++++++++++++
.../flink/sorter/TableSortInfoTest.java | 160 +++++++++++
15 files changed, 962 insertions(+), 99 deletions(-)
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md
index 8b275dbbce99..f1e305ef4bb6 100644
--- a/docs/content/flink/sql-write.md
+++ b/docs/content/flink/sql-write.md
@@ -49,6 +49,38 @@ snapshot expiration, and even partition expiration in Flink Sink (if it is confi
For multiple jobs to write the same table, you can refer to [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
+### Clustering
+
+In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}})
+based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream
+tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}})
+and batch execution mode.
+
+To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:
+
+```sql
+CREATE TABLE my_table (
+ a STRING,
+ b STRING,
+ c STRING,
+) WITH (
+ 'sink.clustering.by-columns' = 'a,b',
+);
+```
+
+You can also use SQL hints to dynamically set clustering options:
+
+```sql
+INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
+SELECT * FROM source;
+```
+
+The data is clustered using an automatically chosen strategy (such as ORDER, ZORDER, or HILBERT), but you can manually specify the clustering strategy
+by setting the `sink.clustering.strategy`. Clustering relies on sampling and sorting. If the clustering process takes too much time, you can decrease
+the total sample number by setting the `sink.clustering.sample-factor` or disable the sorting step by setting the `sink.clustering.sort-in-cluster` to false.
+
+You can refer to [FlinkConnectorOptions]({{< ref "maintenance/configurations#FlinkConnectorOptions" >}}) for more info about the configurations above.
+
## Overwriting the Whole Table
For unpartitioned tables, Paimon supports overwriting the whole table.
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 4fb5fc2713fc..6787cee3bcb4 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -170,6 +170,30 @@
Duration |
If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators. |
+
+ sink.clustering.by-columns |
+ (none) |
+ String |
+ Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for bucket unaware table without primary keys and batch execution mode. |
+
+
+ sink.clustering.sample-factor |
+ 100 |
+ Integer |
+ Specifies the sample factor. Let S represent the total number of samples, F represent the sample factor, and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20. |
+
+
+ sink.clustering.sort-in-cluster |
+ true |
+ Boolean |
+ Indicates whether to further sort data belonged to each sink task after range partitioning. |
+
+
+ sink.clustering.strategy |
+ "auto" |
+ String |
+ Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns. |
+
sink.committer-cpu |
1.0 |
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 275ce836a05e..0256a55007d5 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -48,6 +48,8 @@ public class FlinkConnectorOptions {
public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
+ public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;
+
@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption LOG_SYSTEM =
ConfigOptions.key("log.system")
@@ -387,6 +389,41 @@ public class FlinkConnectorOptions {
"Both can be configured at the same time: 'done-partition,success-file'.")
.build());
+ public static final ConfigOption CLUSTERING_COLUMNS =
+ key("sink.clustering.by-columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
+ + "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
+ + "This option will be effective only for bucket unaware table without primary keys and batch execution mode.");
+
+ public static final ConfigOption CLUSTERING_STRATEGY =
+ key("sink.clustering.strategy")
+ .stringType()
+ .defaultValue("auto")
+ .withDescription(
+ "Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
+ + "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
+ + "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
+ + "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ + "and 'hilbert' for 5 or more columns.");
+
+ public static final ConfigOption CLUSTERING_SORT_IN_CLUSTER =
+ key("sink.clustering.sort-in-cluster")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Indicates whether to further sort data belonged to each sink task after range partitioning.");
+
+ public static final ConfigOption CLUSTERING_SAMPLE_FACTOR =
+ key("sink.clustering.sample-factor")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "Specifies the sample factor. Let S represent the total number of samples, F represent the sample factor, "
+ + "and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20.");
+
public static List> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List> list = new ArrayList<>(fields.length);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index efa2f386d0b6..1cebe8bc1fa4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -21,7 +21,9 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
+import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -116,8 +118,32 @@ public void build() {
}
DataStream source = sourceBuilder.env(env).sourceBounded(true).build();
- TableSorter sorter =
- TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns);
+ int localSampleMagnification =
+ ((FileStoreTable) table).coreOptions().getLocalSampleMagnification();
+ if (localSampleMagnification < 20) {
+ throw new IllegalArgumentException(
+ String.format(
+ "the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
+ CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+ localSampleMagnification));
+ }
+ String sinkParallelismValue =
+ table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ final int sinkParallelism =
+ sinkParallelismValue == null
+ ? source.getParallelism()
+ : Integer.parseInt(sinkParallelismValue);
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(orderColumns)
+ .setSortStrategy(OrderType.of(sortStrategy))
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism * localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+
+ TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortInfo);
new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 1883890ea0e7..4db56601a38e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -164,7 +164,7 @@ public static DataStream> rangeShuffleByKey(
new AssignRangeIndexOperator.RangePartitioner(rangeNum),
new AssignRangeIndexOperator.Tuple2KeySelector<>()),
StreamExchangeMode.BATCH),
- "REMOVE KEY",
+ "REMOVE RANGE INDEX",
new RemoveRangeIndexOperator<>(),
input.getOutputType(),
outParallelism));
@@ -373,12 +373,16 @@ public void processElement1(StreamRecord> streamRecord) {
@Override
public void processElement2(StreamRecord> streamRecord) {
- if (keyIndex == null || keyIndex.isEmpty()) {
- throw new RuntimeException(
- "There should be one data from the first input. And boundaries should not be empty.");
+ if (keyIndex == null) {
+ throw new RuntimeException("There should be one data from the first input.");
+ }
+ // If the range number is 1, the range index will be 0 for all records.
+ if (keyIndex.isEmpty()) {
+ collector.collect(new Tuple2<>(0, streamRecord.getValue()));
+ } else {
+ Tuple2 row = streamRecord.getValue();
+ collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
- Tuple2 row = streamRecord.getValue();
- collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
@Override
@@ -408,7 +412,7 @@ private int binarySearch(T key) {
// key not found, but the low index is the target
// bucket, since the boundaries are the upper bound
return low > lastIndex
- ? keyIndex.get(lastIndex).getRight().get()
+ ? (keyIndex.get(lastIndex).getRight().get() + 1)
: keyIndex.get(low).getRight().get();
}
@@ -438,8 +442,8 @@ public RangePartitioner(int totalRangeNum) {
@Override
public int partition(Integer key, int numPartitions) {
Preconditions.checkArgument(
- numPartitions < totalRangeNum,
- "Num of subPartitions should < totalRangeNum: " + totalRangeNum);
+ numPartitions <= totalRangeNum,
+ "Num of subPartitions should <= totalRangeNum: " + totalRangeNum);
int partition = key / (totalRangeNum / numPartitions);
return Math.min(numPartitions - 1, partition);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index a22de93160a0..8baee5ac1b91 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -20,7 +20,11 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -34,14 +38,27 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
+import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
/**
* DataStream API for building Flink Sink.
@@ -51,12 +68,15 @@
@Public
public class FlinkSinkBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class);
+
private final FileStoreTable table;
private DataStream input;
@Nullable private Map overwritePartition;
@Nullable private Integer parallelism;
private Boolean boundedInput = null;
+ @Nullable private TableSortInfo tableSortInfo;
// ============== for extension ==============
@@ -119,8 +139,88 @@ public FlinkSinkBuilder inputBounded(boolean bounded) {
return this;
}
+ /** Clustering the input data if possible. */
+ public FlinkSinkBuilder clusteringIfPossible(
+ String clusteringColumns,
+ String clusteringStrategy,
+ boolean sortInCluster,
+ int sampleFactor) {
+ // The clustering will be skipped if the clustering columns are empty or the execution
+ // mode is STREAMING or the table type is illegal.
+ if (clusteringColumns == null || clusteringColumns.isEmpty()) {
+ return this;
+ }
+ checkState(input != null, "The input stream should be specified earlier.");
+ if (boundedInput == null) {
+ boundedInput = !FlinkSink.isStreaming(input);
+ }
+ if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
+ LOG.warn(
+ "Clustering is enabled; however, it has been skipped as "
+ + "it only supports the bucket unaware table without primary keys and "
+ + "BATCH execution mode.");
+ return this;
+ }
+ // If the clustering is not skipped, check the clustering column names and sample
+ // factor value.
+ List columns = Arrays.asList(clusteringColumns.split(","));
+ List fieldNames = table.schema().fieldNames();
+ checkState(
+ new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
+ String.format(
+ "Field names %s should contains all clustering column names %s.",
+ fieldNames, columns));
+ checkState(
+ sampleFactor >= MIN_CLUSTERING_SAMPLE_FACTOR,
+ "The minimum allowed "
+ + CLUSTERING_SAMPLE_FACTOR.key()
+ + " is "
+ + MIN_CLUSTERING_SAMPLE_FACTOR
+ + ".");
+ TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
+ if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
+ if (columns.size() == 1) {
+ sortInfoBuilder.setSortStrategy(ORDER);
+ } else if (columns.size() < 5) {
+ sortInfoBuilder.setSortStrategy(ZORDER);
+ } else {
+ sortInfoBuilder.setSortStrategy(HILBERT);
+ }
+ } else {
+ sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
+ }
+ int upstreamParallelism = input.getParallelism();
+ String sinkParallelismValue =
+ table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ int sinkParallelism =
+ sinkParallelismValue == null
+ ? upstreamParallelism
+ : Integer.parseInt(sinkParallelismValue);
+ sortInfoBuilder
+ .setSortColumns(columns)
+ .setSortInCluster(sortInCluster)
+ .setSinkParallelism(sinkParallelism);
+ int globalSampleSize = sinkParallelism * sampleFactor;
+ // If the adaptive scheduler is not enabled, the local sample size is determined by the
+ // division of global sample size by the upstream parallelism, which limits total
+ // received data of global sample node. If the adaptive scheduler is enabled, the
+ // local sample size will equal to sinkParallelism * minimum sample factor.
+ int localSampleSize =
+ upstreamParallelism > 0
+ ? Math.max(sampleFactor, globalSampleSize / upstreamParallelism)
+ : sinkParallelism * MIN_CLUSTERING_SAMPLE_FACTOR;
+ this.tableSortInfo =
+ sortInfoBuilder
+ .setRangeNumber(sinkParallelism)
+ .setGlobalSampleSize(globalSampleSize)
+ .setLocalSampleSize(localSampleSize)
+ .build();
+ return this;
+ }
+
/** Build {@link DataStreamSink}. */
public DataStreamSink> build() {
+ input = trySortInput(input);
DataStream input = MapToInternalRow.map(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
input =
@@ -181,4 +281,14 @@ private DataStreamSink> buildUnawareBucketSink(DataStream input)
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
.sinkFrom(input);
}
+
+ private DataStream trySortInput(DataStream input) {
+ if (tableSortInfo != null) {
+ TableSorter sorter =
+ TableSorter.getSorter(
+ input.getExecutionEnvironment(), input, table, tableSortInfo);
+ return sorter.sort();
+ }
+ return input;
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index b6a944703469..4202717c8646 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -45,6 +45,10 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
/** Table sink to create sink. */
@@ -131,7 +135,12 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
- .inputBounded(context.isBounded());
+ .inputBounded(context.isBounded())
+ .clusteringIfPossible(
+ conf.get(CLUSTERING_COLUMNS),
+ conf.get(CLUSTERING_STRATEGY),
+ conf.get(CLUSTERING_SORT_IN_CLUSTER),
+ conf.get(CLUSTERING_SAMPLE_FACTOR));
if (overwrite) {
builder.overwrite(staticPartitions);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
index 944e0fc430b0..26c698f5ed71 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
@@ -36,7 +36,6 @@
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* This is a table sorter which will sort the records by the hilbert curve of specified columns. It
@@ -49,12 +48,15 @@ public class HilbertSorter extends TableSorter {
private static final RowType KEY_TYPE =
new RowType(Collections.singletonList(new DataField(0, "H_INDEX", DataTypes.BYTES())));
+ private final TableSortInfo tableSortInfo;
+
public HilbertSorter(
StreamExecutionEnvironment batchTEnv,
DataStream origin,
FileStoreTable table,
- List colNames) {
- super(batchTEnv, origin, table, colNames);
+ TableSortInfo tableSortInfo) {
+ super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+ this.tableSortInfo = tableSortInfo;
}
@Override
@@ -99,6 +101,7 @@ public byte[] apply(RowData value) {
return Arrays.copyOf(hilbert, hilbert.length);
}
},
- GenericRow::of);
+ GenericRow::of,
+ tableSortInfo);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index dadd4c413c61..406195b6fce6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -31,19 +31,20 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
-import java.util.List;
-
import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
/** Alphabetical order sorter to sort records by the given `orderColNames`. */
public class OrderSorter extends TableSorter {
+ private final TableSortInfo tableSortInfo;
+
public OrderSorter(
StreamExecutionEnvironment batchTEnv,
DataStream origin,
FileStoreTable table,
- List orderColNames) {
- super(batchTEnv, origin, table, orderColNames);
+ TableSortInfo tableSortInfo) {
+ super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+ this.tableSortInfo = tableSortInfo;
}
@Override
@@ -75,6 +76,7 @@ public InternalRow apply(RowData value) {
return keyProjection.apply(new FlinkRowWrapper(value)).copy();
}
},
- row -> row);
+ row -> row,
+ tableSortInfo);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 9a1dbb729b9e..79e9f1298104 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -21,7 +21,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
@@ -38,7 +37,12 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.ArrayList;
@@ -72,6 +76,7 @@ public class SortUtils {
* @param shuffleKeyAbstract abstract the key from the input `RowData`
* @param convertor convert the `KEY` to the sort key, then we can sort in
* `BinaryExternalSortBuffer`.
+ * @param tableSortInfo the necessary info of table sort.
* @return the global sorted data stream
* @param the KEY type in range shuffle
*/
@@ -82,34 +87,15 @@ public static DataStream sortStreamByKey(
final TypeInformation keyTypeInformation,
final SerializableSupplier> shuffleKeyComparator,
final KeyAbstract shuffleKeyAbstract,
- final ShuffleKeyConvertor convertor) {
+ final ShuffleKeyConvertor convertor,
+ final TableSortInfo tableSortInfo) {
final RowType valueRowType = table.rowType();
- final int parallelism = inputStream.getParallelism();
CoreOptions options = table.coreOptions();
-
- String sinkParallelismValue =
- table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
- final int sinkParallelism =
- sinkParallelismValue == null
- ? inputStream.getParallelism()
- : Integer.parseInt(sinkParallelismValue);
- if (sinkParallelism == -1) {
- throw new UnsupportedOperationException(
- "The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
- + FlinkConnectorOptions.SINK_PARALLELISM.key());
- }
- int localSampleMagnification = options.getLocalSampleMagnification();
- if (localSampleMagnification < 20) {
- throw new IllegalArgumentException(
- String.format(
- "the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
- CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
- localSampleMagnification));
- }
- final int localSampleSize = sinkParallelism * localSampleMagnification;
- final int globalSampleSize = sinkParallelism * 1000;
- final int rangeNum = sinkParallelism * 10;
+ final int sinkParallelism = tableSortInfo.getSinkParallelism();
+ final int localSampleSize = tableSortInfo.getLocalSampleSize();
+ final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
+ final int rangeNum = tableSortInfo.getRangeNumber();
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
@@ -145,10 +131,11 @@ public Tuple2 map(RowData value) {
}
},
new TupleTypeInfo<>(keyTypeInformation, inputStream.getType()))
- .setParallelism(parallelism);
+ .setParallelism(inputStream.getParallelism());
// range shuffle by key
- return RangeShuffle.rangeShuffleByKey(
+ DataStream> rangeShuffleResult =
+ RangeShuffle.rangeShuffleByKey(
inputWithKey,
shuffleKeyComparator,
keyTypeInformation,
@@ -157,45 +144,52 @@ public Tuple2 map(RowData value) {
rangeNum,
sinkParallelism,
valueRowType,
- options.sortBySize())
- .map(
- a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
- internalRowType)
- .setParallelism(sinkParallelism)
- // sort the output locally by `SortOperator`
- .transform(
- "LOCAL SORT",
- internalRowType,
- new SortOperator(
- sortKeyType,
- longRowType,
- options.writeBufferSize(),
- options.pageSize(),
- options.localSortMaxNumFileHandles(),
- options.spillCompression(),
- sinkParallelism,
- options.writeBufferSpillDiskSize()))
- .setParallelism(sinkParallelism)
- // remove the key column from every row
- .map(
- new RichMapFunction() {
-
- private transient KeyProjectedRow keyProjectedRow;
-
- @Override
- public void open(Configuration parameters) {
- keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
- }
-
- @Override
- public InternalRow map(InternalRow value) {
- return keyProjectedRow.replaceRow(value);
- }
- },
- InternalTypeInfo.fromRowType(valueRowType))
- .setParallelism(sinkParallelism)
- .map(FlinkRowData::new, inputStream.getType())
- .setParallelism(sinkParallelism);
+ options.sortBySize());
+ if (tableSortInfo.isSortInCluster()) {
+ return rangeShuffleResult
+ .map(
+ a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
+ internalRowType)
+ .setParallelism(sinkParallelism)
+ // sort the output locally by `SortOperator`
+ .transform(
+ "LOCAL SORT",
+ internalRowType,
+ new SortOperator(
+ sortKeyType,
+ longRowType,
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles(),
+ options.spillCompression(),
+ sinkParallelism,
+ options.writeBufferSpillDiskSize()))
+ .setParallelism(sinkParallelism)
+ // remove the key column from every row
+ .map(
+ new RichMapFunction() {
+
+ private transient KeyProjectedRow keyProjectedRow;
+
+ @Override
+ public void open(Configuration parameters) {
+ keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) {
+ return keyProjectedRow.replaceRow(value);
+ }
+ },
+ InternalTypeInfo.fromRowType(valueRowType))
+ .setParallelism(sinkParallelism)
+ .map(FlinkRowData::new, inputStream.getType())
+ .setParallelism(sinkParallelism);
+ } else {
+ return rangeShuffleResult
+ .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>())
+ .setParallelism(sinkParallelism);
+ }
}
/** Abstract key from a row data. */
@@ -206,4 +200,24 @@ default void open() {}
}
interface ShuffleKeyConvertor extends Function, Serializable {}
+
+ /** Remove the abstract key. */
+ private static class RemoveKeyOperator extends TableStreamOperator
+ implements OneInputStreamOperator, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Collector collector;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processElement(StreamRecord> streamRecord) {
+ collector.collect(streamRecord.getValue().f1);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
new file mode 100644
index 000000000000..16760bd35b11
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link TableSortInfo} is used to indicate the configuration details for table data sorting. This
+ * includes information about which columns to sort by, the sorting strategy (e.g., order, Z-order),
+ * whether to sort within each cluster, and sample sizes for local and global sample nodes.
+ */
+public class TableSortInfo {
+
+ private final List sortColumns;
+
+ private final OrderType sortStrategy;
+
+ private final boolean sortInCluster;
+
+ private final int rangeNumber;
+
+ private final int sinkParallelism;
+
+ private final int localSampleSize;
+
+ private final int globalSampleSize;
+
+ private TableSortInfo(
+ List sortColumns,
+ OrderType sortStrategy,
+ boolean sortInCluster,
+ int rangeNumber,
+ int sinkParallelism,
+ int localSampleSize,
+ int globalSampleSize) {
+ this.sortColumns = sortColumns;
+ this.sortStrategy = sortStrategy;
+ this.sortInCluster = sortInCluster;
+ this.rangeNumber = rangeNumber;
+ this.sinkParallelism = sinkParallelism;
+ this.localSampleSize = localSampleSize;
+ this.globalSampleSize = globalSampleSize;
+ }
+
+ public List getSortColumns() {
+ return sortColumns;
+ }
+
+ public OrderType getSortStrategy() {
+ return sortStrategy;
+ }
+
+ public boolean isSortInCluster() {
+ return sortInCluster;
+ }
+
+ public int getRangeNumber() {
+ return rangeNumber;
+ }
+
+ public int getLocalSampleSize() {
+ return localSampleSize;
+ }
+
+ public int getGlobalSampleSize() {
+ return globalSampleSize;
+ }
+
+ public int getSinkParallelism() {
+ return sinkParallelism;
+ }
+
+ /** Builder for {@link TableSortInfo}. */
+ public static class Builder {
+
+ private List sortColumns = Collections.emptyList();
+
+ private OrderType sortStrategy = OrderType.ORDER;
+
+ private boolean sortInCluster = true;
+
+ private int rangeNumber = -1;
+
+ private int sinkParallelism = -1;
+
+ private int localSampleSize = -1;
+
+ private int globalSampleSize = -1;
+
+ public Builder setSortColumns(List sortColumns) {
+ this.sortColumns = sortColumns;
+ return this;
+ }
+
+ public Builder setSortStrategy(OrderType sortStrategy) {
+ this.sortStrategy = sortStrategy;
+ return this;
+ }
+
+ public Builder setSortInCluster(boolean sortInCluster) {
+ this.sortInCluster = sortInCluster;
+ return this;
+ }
+
+ public Builder setRangeNumber(int rangeNumber) {
+ this.rangeNumber = rangeNumber;
+ return this;
+ }
+
+ public Builder setSinkParallelism(int sinkParallelism) {
+ this.sinkParallelism = sinkParallelism;
+ return this;
+ }
+
+ public Builder setLocalSampleSize(int localSampleSize) {
+ this.localSampleSize = localSampleSize;
+ return this;
+ }
+
+ public Builder setGlobalSampleSize(int globalSampleSize) {
+ this.globalSampleSize = globalSampleSize;
+ return this;
+ }
+
+ public TableSortInfo build() {
+ checkArgument(!sortColumns.isEmpty(), "Sort columns cannot be empty");
+ checkNotNull(sortStrategy, "Sort strategy cannot be null");
+ checkArgument(
+ sinkParallelism > 0,
+ "The sink parallelism must be specified when sorting the table data. Please set it using the key: %s",
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
+ checkArgument(rangeNumber > 0, "Range number must be positive");
+ checkArgument(localSampleSize > 0, "Local sample size must be positive");
+ checkArgument(globalSampleSize > 0, "Global sample size must be positive");
+ return new TableSortInfo(
+ sortColumns,
+ sortStrategy,
+ sortInCluster,
+ rangeNumber,
+ sinkParallelism,
+ localSampleSize,
+ globalSampleSize);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index e4f85ea9d706..a0d4b6af2639 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -70,21 +70,22 @@ public static TableSorter getSorter(
StreamExecutionEnvironment batchTEnv,
DataStream origin,
FileStoreTable fileStoreTable,
- String sortStrategy,
- List orderColumns) {
- switch (OrderType.of(sortStrategy)) {
+ TableSortInfo sortInfo) {
+ OrderType sortStrategy = sortInfo.getSortStrategy();
+ switch (sortStrategy) {
case ORDER:
- return new OrderSorter(batchTEnv, origin, fileStoreTable, orderColumns);
+ return new OrderSorter(batchTEnv, origin, fileStoreTable, sortInfo);
case ZORDER:
- return new ZorderSorter(batchTEnv, origin, fileStoreTable, orderColumns);
+ return new ZorderSorter(batchTEnv, origin, fileStoreTable, sortInfo);
case HILBERT:
- return new HilbertSorter(batchTEnv, origin, fileStoreTable, orderColumns);
+ return new HilbertSorter(batchTEnv, origin, fileStoreTable, sortInfo);
default:
throw new IllegalArgumentException("cannot match order type: " + sortStrategy);
}
}
- enum OrderType {
+ /** The order type of table sort. */
+ public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert");
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index bc3c7f5c5e31..fad66f364731 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -36,7 +36,6 @@
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* This is a table sorter which will sort the records by the z-order of specified columns. It works
@@ -49,12 +48,15 @@ public class ZorderSorter extends TableSorter {
private static final RowType KEY_TYPE =
new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", DataTypes.BYTES())));
+ private final TableSortInfo sortInfo;
+
public ZorderSorter(
StreamExecutionEnvironment batchTEnv,
DataStream origin,
FileStoreTable table,
- List zOrderColNames) {
- super(batchTEnv, origin, table, zOrderColNames);
+ TableSortInfo sortInfo) {
+ super(batchTEnv, origin, table, sortInfo.getSortColumns());
+ this.sortInfo = sortInfo;
}
@Override
@@ -102,6 +104,7 @@ public byte[] apply(RowData value) {
return Arrays.copyOf(zorder, zorder.length);
}
},
- GenericRow::of);
+ GenericRow::of,
+ sortInfo);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
new file mode 100644
index 000000000000..c65f4ac12aa6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AppendOnlyFileStoreScan;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * The IT test for the range partitioning and sort batch writing for append-only bucket unaware
+ * tables.
+ */
+public class RangePartitionAndSortForUnawareBucketTableITCase extends CatalogITCaseBase {
+
+ private static final int SINK_ROW_NUMBER = 1000;
+
+ @Test
+ public void testSortConfigurationChecks() {
+ batchSql(
+ "CREATE TEMPORARY TABLE source1 (col1 INT, col2 INT, col3 INT, col4 INT) "
+ + "WITH ('connector'='values', 'bounded'='true')");
+ batchSql("CREATE TABLE IF NOT EXISTS sink1 (col1 INT, col2 INT, col3 INT, col4 INT)");
+ streamSqlIter(
+ "CREATE TEMPORARY TABLE source2 (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values')");
+ streamSqlIter("CREATE TABLE IF NOT EXISTS sink2 (col1 INT, col2 INT, col3 INT, col4 INT)");
+ // 1. Check the sort columns.
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1,xx1') */ "
+ + "SELECT * FROM source1"))
+ .withMessageContaining("should contains all clustering column names");
+ // 2. Check the sample factor.
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.clustering.sample-factor' = '10') */ SELECT * "
+ + "FROM source1"))
+ .withMessageContaining("The minimum allowed sink.clustering.sample-factor");
+ // 3. Check the sink parallelism.
+ batchSql(
+ "CREATE TEMPORARY TABLE source3 (col1 INT, col2 INT) WITH ('connector'='values', 'bounded'='true')");
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1') */ "
+ + "SELECT source1.col1, source1.col2, source3.col1, "
+ + "source3.col2 FROM source1 JOIN source3 ON source1.col1 = source3.col1"))
+ .withMessageContaining(
+ "The sink parallelism must be specified when sorting the table data");
+ }
+
+ @Test
+ public void testRangePartition() throws Exception {
+ List inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10', 'sink.clustering.sort-in-cluster' = 'false') */ "
+ + "SELECT * FROM test_source");
+ List sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ List files = testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List> minMaxOfEachFile = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+ .withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("/temp/xxx")
+ .build();
+ final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+ final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+ testStoreTable
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+ int result = internalRow.getInt(0);
+ min.set(Math.min(min.get(), result));
+ max.set(Math.max(max.get(), result));
+ });
+ minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+ }
+ minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+ Tuple2 preResult = minMaxOfEachFile.get(0);
+ for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+ Tuple2 currentResult = minMaxOfEachFile.get(index);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+ assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+ }
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithOrderStrategy() throws Exception {
+ List inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10', 'sink.clustering.strategy' = 'order') */ "
+ + "SELECT * FROM test_source");
+ List sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ List files = testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List> minMaxOfEachFile = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+ .withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("/temp/xxx")
+ .build();
+ final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+ final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+ final AtomicInteger current = new AtomicInteger(Integer.MIN_VALUE);
+ testStoreTable
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+ int result = internalRow.getInt(0);
+ min.set(Math.min(min.get(), result));
+ max.set(Math.max(max.get(), result));
+ Assertions.assertThat(result).isGreaterThanOrEqualTo(current.get());
+ current.set(result);
+ });
+ minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+ }
+ minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+ Tuple2 preResult = minMaxOfEachFile.get(0);
+ for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+ Tuple2 currentResult = minMaxOfEachFile.get(index);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+ assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+ }
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithZOrderStrategy() throws Exception {
+ List inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10', 'sink.clustering.strategy' = 'zorder') */ "
+ + "SELECT * FROM test_source");
+ List sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ PredicateBuilder predicateBuilder = new PredicateBuilder(testStoreTable.rowType());
+ Predicate predicate = predicateBuilder.between(0, 100, 200);
+ List files = testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List filesFilter =
+ ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithHilbertStrategy() throws Exception {
+ List inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1,col2', "
+ + "'sink.parallelism' = '10', 'sink.clustering.strategy' = 'hilbert') */ "
+ + "SELECT * FROM test_source");
+ List sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ PredicateBuilder predicateBuilder = new PredicateBuilder(testStoreTable.rowType());
+ Predicate predicate = predicateBuilder.between(0, 100, 200);
+ List files = testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List filesFilter =
+ ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ }
+
+ private List generateSinkRows() {
+ List sinkRows = new ArrayList<>();
+ Random random = new Random();
+ for (int round = 0; round < SINK_ROW_NUMBER; round++) {
+ sinkRows.add(
+ Row.ofKind(
+ RowKind.INSERT,
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER)));
+ }
+ return sinkRows;
+ }
+
+ @Override
+ protected List ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS test_table (col1 INT, col2 INT, col3 INT, col4 INT)");
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
new file mode 100644
index 000000000000..e9e01baead2c
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** The unit test for {@link TableSortInfo}. */
+public class TableSortInfoTest {
+
+ @Test
+ public void testTableSortInfoBuilderWithValidParameters() {
+ TableSortInfo tableSortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build();
+
+ assertThat(tableSortInfo.getSortColumns()).containsExactly("column1", "column2");
+ assertThat(tableSortInfo.getSortStrategy()).isEqualTo(OrderType.ORDER);
+ assertThat(tableSortInfo.isSortInCluster()).isTrue();
+ assertThat(tableSortInfo.getRangeNumber()).isEqualTo(10);
+ assertThat(tableSortInfo.getSinkParallelism()).isEqualTo(5);
+ assertThat(tableSortInfo.getLocalSampleSize()).isEqualTo(100);
+ assertThat(tableSortInfo.getGlobalSampleSize()).isEqualTo(500);
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithEmptySortColumns() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Collections.emptyList())
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Sort columns cannot be empty");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNullSortStrategy() {
+ assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(null)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Sort strategy cannot be null");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNegativeRangeNumber() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(-1)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Range number must be positive");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithZeroSinkParallelism() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(0)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessageContaining(
+ "The sink parallelism must be specified when sorting the table data. Please set it using the key: %s",
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithZeroLocalSampleSize() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(0) // This should trigger an exception
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Local sample size must be positive");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNegativeGlobalSampleSize() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(-1) // This should trigger an exception
+ .build())
+ .withMessage("Global sample size must be positive");
+ }
+}