From 6a6d1243c26fbc41ce224a98ff5769395be2eb06 Mon Sep 17 00:00:00 2001
From: xuzifu666 <1206332514@qq.com>
Date: Wed, 30 Oct 2024 12:50:54 +0800
Subject: [PATCH] [core] Support sequence.field sort order for pk table (#4385)
---
.../generated/core_configuration.html | 6 ++++
.../paimon/codegen/CodeGeneratorImpl.java | 10 +++---
.../java/org/apache/paimon/CoreOptions.java | 36 +++++++++++++++++++
.../apache/paimon/codegen/CodeGenerator.java | 3 +-
.../apache/paimon/codegen/CodeGenUtils.java | 10 ++++--
.../crosspartition/GlobalIndexAssigner.java | 3 +-
.../apache/paimon/lookup/RocksDBState.java | 3 +-
.../mergetree/SortBufferWriteBuffer.java | 2 +-
.../compact/PartialUpdateMergeFunction.java | 4 +--
.../paimon/operation/MergeFileSplitRead.java | 4 ++-
.../paimon/sort/BinaryExternalSortBuffer.java | 12 ++++---
.../paimon/utils/KeyComparatorSupplier.java | 2 +-
.../utils/UserDefinedSeqComparator.java | 14 +++++---
.../paimon/codegen/CodeGenUtilsTest.java | 9 ++---
.../flink/lookup/FullCacheLookupTable.java | 8 +++--
.../paimon/flink/sorter/SortOperator.java | 8 +++--
.../apache/paimon/flink/sorter/SortUtils.java | 3 +-
.../paimon/flink/SchemaChangeITCase.java | 30 ++++++++++++++++
.../paimon/flink/sorter/SortOperatorTest.java | 6 ++--
19 files changed, 137 insertions(+), 36 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 9bbed03bd948..84ba86124f8b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -713,6 +713,12 @@
String |
The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent. |
+
+ sequence.field.sort-order |
+ ascending |
+ Enum |
+ Specify the order of sequence.field.
Possible values:- "ascending": specifies sequence.field sort order is ascending.
- "descending": specifies sequence.field sort order is descending.
|
+
sink.watermark-time-zone |
"UTC" |
diff --git a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
index 29096e96b206..5a6a27738b50 100644
--- a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
+++ b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
@@ -39,17 +39,17 @@ public GeneratedClass generateNormalizedKeyComputer(
List inputTypes, int[] sortFields) {
return new SortCodeGenerator(
RowType.builder().fields(inputTypes).build(),
- getAscendingSortSpec(sortFields))
+ getAscendingSortSpec(sortFields, true))
.generateNormalizedKeyComputer("NormalizedKeyComputer");
}
@Override
public GeneratedClass generateRecordComparator(
- List inputTypes, int[] sortFields) {
+ List inputTypes, int[] sortFields, boolean isAscendingOrder) {
return ComparatorCodeGenerator.gen(
"RecordComparator",
RowType.builder().fields(inputTypes).build(),
- getAscendingSortSpec(sortFields));
+ getAscendingSortSpec(sortFields, isAscendingOrder));
}
@Override
@@ -59,10 +59,10 @@ public GeneratedClass generateRecordEqualiser(
.generateRecordEqualiser("RecordEqualiser");
}
- private SortSpec getAscendingSortSpec(int[] sortFields) {
+ private SortSpec getAscendingSortSpec(int[] sortFields, boolean isAscendingOrder) {
SortSpec.SortSpecBuilder builder = SortSpec.builder();
for (int sortField : sortFields) {
- builder.addField(sortField, true, false);
+ builder.addField(sortField, isAscendingOrder, false);
}
return builder.build();
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 18dae0d38416..39eda336b033 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -609,6 +609,13 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");
+ @Immutable
+ public static final ConfigOption SEQUENCE_FIELD_SORT_ORDER =
+ key("sequence.field.sort-order")
+ .enumType(SortOrder.class)
+ .defaultValue(SortOrder.ASCENDING)
+ .withDescription("Specify the order of sequence.field.");
+
@Immutable
public static final ConfigOption PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
@@ -2043,6 +2050,10 @@ public List sequenceField() {
.orElse(Collections.emptyList());
}
+ public boolean sequenceFieldSortOrderIsAscending() {
+ return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
+ }
+
public boolean partialUpdateRemoveRecordOnDelete() {
return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
}
@@ -2382,6 +2393,31 @@ public InlineElement getDescription() {
}
}
+ /** Specifies the sort order for field sequence id. */
+ public enum SortOrder implements DescribedEnum {
+ ASCENDING("ascending", "specifies sequence.field sort order is ascending."),
+
+ DESCENDING("descending", "specifies sequence.field sort order is descending.");
+
+ private final String value;
+ private final String description;
+
+ SortOrder(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
/** Specifies the log consistency mode for table. */
public enum LogConsistency implements DescribedEnum {
TRANSACTIONAL(
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/CodeGenerator.java b/paimon-common/src/main/java/org/apache/paimon/codegen/CodeGenerator.java
index e137619143a3..324a8d726fb2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/CodeGenerator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/CodeGenerator.java
@@ -44,9 +44,10 @@ GeneratedClass generateNormalizedKeyComputer(
* @param inputTypes input types.
* @param sortFields the sort key fields. Records are compared by the first field, then the
* second field, then the third field and so on. All fields are compared in ascending order.
+ * @param isAscendingOrder decide the sort key fields order whether is ascending
*/
GeneratedClass generateRecordComparator(
- List inputTypes, int[] sortFields);
+ List inputTypes, int[] sortFields, boolean isAscendingOrder);
/** Generate a {@link RecordEqualiser} with fields. */
GeneratedClass generateRecordEqualiser(
diff --git a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
index 76aeae54732b..d452df5fb65e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
@@ -70,16 +70,20 @@ public static NormalizedKeyComputer newNormalizedKeyComputer(
}
public static RecordComparator newRecordComparator(List inputTypes) {
- return newRecordComparator(inputTypes, IntStream.range(0, inputTypes.size()).toArray());
+ return newRecordComparator(
+ inputTypes, IntStream.range(0, inputTypes.size()).toArray(), true);
}
public static RecordComparator newRecordComparator(
- List inputTypes, int[] sortFields) {
+ List inputTypes, int[] sortFields, boolean isAscendingOrder) {
return generate(
RecordComparator.class,
inputTypes,
sortFields,
- () -> getCodeGenerator().generateRecordComparator(inputTypes, sortFields));
+ () ->
+ getCodeGenerator()
+ .generateRecordComparator(
+ inputTypes, sortFields, isAscendingOrder));
}
public static RecordEqualiser newRecordEqualiser(List fieldTypes) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 311b997a4bc8..4700e7399802 100644
--- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -306,7 +306,8 @@ private void bulkLoadBootstrapRecords() {
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompressOptions(),
- coreOptions.writeBufferSpillDiskSize());
+ coreOptions.writeBufferSpillDiskSize(),
+ coreOptions.sequenceFieldSortOrderIsAscending());
Function iteratorFunction =
sortOrder -> {
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index 25e58984edbe..0181917a7a26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -113,7 +113,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter(
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
- options.writeBufferSpillDiskSize());
+ options.writeBufferSpillDiskSize(),
+ options.sequenceFieldSortOrderIsAscending());
}
/** A class wraps byte[] to implement equals and hashCode. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 76c84fd4c937..433ff2158bc9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -101,7 +101,7 @@ public SortBufferWriteBuffer(
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);
RecordComparator keyComparator =
- CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);
+ CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, true);
if (memoryPool.freePages() < 3) {
throw new IllegalArgumentException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 11661851cf4a..b15d9388aa35 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -301,7 +301,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) {
.collect(Collectors.toList());
Supplier userDefinedSeqComparator =
- () -> UserDefinedSeqComparator.create(rowType, sequenceFields);
+ () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
Arrays.stream(v.split(FIELDS_SEPARATOR))
.map(
fieldName ->
@@ -390,7 +390,7 @@ public MergeFunction create(@Nullable int[][] projection) {
projectedSeqComparators.put(
newField,
UserDefinedSeqComparator.create(
- newRowType, newSequenceFields));
+ newRowType, newSequenceFields, true));
}
});
for (int i = 0; i < projects.length; i++) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index d1f332a0384a..c21c3683ce08 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -77,6 +77,7 @@ public class MergeFileSplitRead implements SplitRead {
private final MergeFunctionFactory mfFactory;
private final MergeSorter mergeSorter;
private final List sequenceFields;
+ private final boolean sequenceOrder;
@Nullable private RowType readKeyType;
@@ -106,6 +107,7 @@ public MergeFileSplitRead(
new MergeSorter(
CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);
this.sequenceFields = options.sequenceField();
+ this.sequenceOrder = options.sequenceFieldSortOrderIsAscending();
}
public Comparator keyComparator() {
@@ -338,6 +340,6 @@ private RecordReader projectOuter(RecordReader reader) {
@Nullable
public UserDefinedSeqComparator createUdsComparator() {
return UserDefinedSeqComparator.create(
- readerFactoryBuilder.readValueType(), sequenceFields);
+ readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index 1ae45354646b..4bfbcd5ec71f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -101,7 +101,8 @@ public static BinaryExternalSortBuffer create(
int pageSize,
int maxNumFileHandles,
CompressOptions compression,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ boolean sequenceOrder) {
return create(
ioManager,
rowType,
@@ -109,7 +110,8 @@ public static BinaryExternalSortBuffer create(
new HeapMemorySegmentPool(bufferSize, pageSize),
maxNumFileHandles,
compression,
- maxDiskSize);
+ maxDiskSize,
+ sequenceOrder);
}
public static BinaryExternalSortBuffer create(
@@ -119,8 +121,10 @@ public static BinaryExternalSortBuffer create(
MemorySegmentPool pool,
int maxNumFileHandles,
CompressOptions compression,
- MemorySize maxDiskSize) {
- RecordComparator comparator = newRecordComparator(rowType.getFieldTypes(), keyFields);
+ MemorySize maxDiskSize,
+ boolean sequenceOrder) {
+ RecordComparator comparator =
+ newRecordComparator(rowType.getFieldTypes(), keyFields, sequenceOrder);
BinaryInMemorySortBuffer sortBuffer =
BinaryInMemorySortBuffer.createBuffer(
newNormalizedKeyComputer(rowType.getFieldTypes(), keyFields),
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
index 6f90cef01a20..25fd07b7cad2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
@@ -45,6 +45,6 @@ public KeyComparatorSupplier(RowType keyType) {
@Override
public RecordComparator get() {
- return newRecordComparator(inputTypes, sortFields);
+ return newRecordComparator(inputTypes, sortFields, true);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
index ec7a00bcb3b2..5e5eb293d0de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -51,11 +51,13 @@ public int compare(InternalRow o1, InternalRow o2) {
@Nullable
public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) {
- return create(rowType, options.sequenceField());
+ return create(
+ rowType, options.sequenceField(), options.sequenceFieldSortOrderIsAscending());
}
@Nullable
- public static UserDefinedSeqComparator create(RowType rowType, List sequenceFields) {
+ public static UserDefinedSeqComparator create(
+ RowType rowType, List sequenceFields, boolean isAscendingOrder) {
if (sequenceFields.isEmpty()) {
return null;
}
@@ -63,17 +65,19 @@ public static UserDefinedSeqComparator create(RowType rowType, List sequ
List fieldNames = rowType.getFieldNames();
int[] fields = sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();
- return create(rowType, fields);
+ return create(rowType, fields, isAscendingOrder);
}
@Nullable
- public static UserDefinedSeqComparator create(RowType rowType, int[] sequenceFields) {
+ public static UserDefinedSeqComparator create(
+ RowType rowType, int[] sequenceFields, boolean isAscendingOrder) {
if (sequenceFields.length == 0) {
return null;
}
RecordComparator comparator =
- CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), sequenceFields);
+ CodeGenUtils.newRecordComparator(
+ rowType.getFieldTypes(), sequenceFields, isAscendingOrder);
return new UserDefinedSeqComparator(sequenceFields, comparator);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
index b7e416af137b..c0da56f54454 100644
--- a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
@@ -71,14 +71,15 @@ public void testNormalizedKeyComputerCodegenCacheMiss() {
@Test
public void testRecordComparatorCodegenCache() {
assertClassEquals(
- () -> newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}));
+ () -> newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true));
}
@Test
public void testRecordComparatorCodegenCacheMiss() {
assertClassNotEquals(
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}),
- newRecordComparator(Arrays.asList(STRING(), INT(), DOUBLE()), new int[] {0, 1, 2}));
+ newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true),
+ newRecordComparator(
+ Arrays.asList(STRING(), INT(), DOUBLE()), new int[] {0, 1, 2}, true));
}
@Test
@@ -96,7 +97,7 @@ public void testRecordEqualiserCodegenCacheMiss() {
@Test
public void testHybridNotEqual() {
assertClassNotEquals(
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}),
+ newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true),
newNormalizedKeyComputer(Arrays.asList(STRING(), INT()), new int[] {0, 1}));
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 89134585b9af..de69c67a4c44 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -91,8 +91,9 @@ public abstract class FullCacheLookupTable implements LookupTable {
public FullCacheLookupTable(Context context) {
this.table = context.table;
List sequenceFields = new ArrayList<>();
+ CoreOptions coreOptions = new CoreOptions(table.options());
if (table.primaryKeys().size() > 0) {
- sequenceFields = new CoreOptions(table.options()).sequenceField();
+ sequenceFields = coreOptions.sequenceField();
}
RowType projectedType = TypeUtils.project(table.rowType(), context.projection);
if (sequenceFields.size() > 0) {
@@ -111,7 +112,10 @@ public FullCacheLookupTable(Context context) {
projectedType = builder.build();
context = context.copy(table.rowType().getFieldIndices(projectedType.getFieldNames()));
this.userDefinedSeqComparator =
- UserDefinedSeqComparator.create(projectedType, sequenceFields);
+ UserDefinedSeqComparator.create(
+ projectedType,
+ sequenceFields,
+ coreOptions.sequenceFieldSortOrderIsAscending());
this.appendUdsFieldNumber = appendUdsFieldNumber.get();
} else {
this.userDefinedSeqComparator = null;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 52ad896892a7..d4d5dd741681 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -48,6 +48,7 @@ public class SortOperator extends TableStreamOperator
private final CompressOptions spillCompression;
private final int sinkParallelism;
private final MemorySize maxDiskSize;
+ private final boolean sequenceOrder;
private transient BinaryExternalSortBuffer buffer;
private transient IOManager ioManager;
@@ -60,7 +61,8 @@ public SortOperator(
int spillSortMaxNumFiles,
CompressOptions spillCompression,
int sinkParallelism,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ boolean sequenceOrder) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
@@ -70,6 +72,7 @@ public SortOperator(
this.spillCompression = spillCompression;
this.sinkParallelism = sinkParallelism;
this.maxDiskSize = maxDiskSize;
+ this.sequenceOrder = sequenceOrder;
}
@Override
@@ -100,7 +103,8 @@ void initBuffer() {
pageSize,
spillSortMaxNumFiles,
spillCompression,
- maxDiskSize);
+ maxDiskSize,
+ sequenceOrder);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index e163ac364026..f590c2fb7fff 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -163,7 +163,8 @@ public Tuple2 map(RowData value) {
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
- options.writeBufferSpillDiskSize()))
+ options.writeBufferSpillDiskSize(),
+ options.sequenceFieldSortOrderIsAscending()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index e1655bcb6ba9..08f79efccb76 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1007,6 +1007,36 @@ public void testAlterTableNonPhysicalColumn() {
.doesNotContain("schema");
}
+ @Test
+ public void testSequenceFieldSortOrder() {
+ // test default condition which get the largest record
+ sql(
+ "CREATE TABLE T1 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING) WITH ('sequence.field'='c')");
+ sql("INSERT INTO T1 VALUES ('a', 'b', 'l')");
+ sql("INSERT INTO T1 VALUES ('a', 'd', 'n')");
+ sql("INSERT INTO T1 VALUES ('a', 'e', 'm')");
+ List sql = sql("select * from T1");
+ assertThat(sql("select * from T1").toString()).isEqualTo("[+I[a, d, n]]");
+
+ // test for get small record
+ sql(
+ "CREATE TABLE T2 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c BIGINT) WITH ('sequence.field'='c', 'sequence.field.sort-order'='descending')");
+ sql("INSERT INTO T2 VALUES ('a', 'b', 1)");
+ sql("INSERT INTO T2 VALUES ('a', 'd', 3)");
+ sql("INSERT INTO T2 VALUES ('a', 'e', 2)");
+ sql = sql("select * from T2");
+ assertThat(sql("select * from T2").toString()).isEqualTo("[+I[a, b, 1]]");
+
+ // test for get largest record
+ sql(
+ "CREATE TABLE T3 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c DOUBLE) WITH ('sequence.field'='c', 'sequence.field.sort-order'='ascending')");
+ sql("INSERT INTO T3 VALUES ('a', 'b', 1.0)");
+ sql("INSERT INTO T3 VALUES ('a', 'd', 3.0)");
+ sql("INSERT INTO T3 VALUES ('a', 'e', 2.0)");
+ sql = sql("select * from T3");
+ assertThat(sql("select * from T3").toString()).isEqualTo("[+I[a, d, 3.0]]");
+ }
+
@Test
public void testAlterTableMetadataComment() {
sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b INT)");
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
index 155e259e02bb..c74c1c2c17a4 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
@@ -65,7 +65,8 @@ public void testSort() throws Exception {
128,
CompressOptions.defaultOptions(),
1,
- MemorySize.MAX_VALUE) {};
+ MemorySize.MAX_VALUE,
+ true) {};
OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
@@ -114,7 +115,8 @@ public void testCloseSortOperator() throws Exception {
128,
CompressOptions.defaultOptions(),
1,
- MemorySize.MAX_VALUE) {};
+ MemorySize.MAX_VALUE,
+ true) {};
OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
File[] files = harness.getEnvironment().getIOManager().getSpillingDirectories();