Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Oct 30, 2024
1 parent 8908506 commit 373dd68
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 26 deletions.
8 changes: 4 additions & 4 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,10 @@
<td>The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent.</td>
</tr>
<tr>
<td><h5>sequence.field.sort.is.ascending</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Specify the order of sequence.field is ascending.</td>
<td><h5>sequence.field.sort-order</h5></td>
<td style="word-wrap: break-word;">ascending</td>
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible values:<ul><li>"ascending": specifies sequence.field sort order is ascending.</li><li>"descending": specifies sequence.field sort order is descending.</li></ul></td>
</tr>
<tr>
<td><h5>sink.watermark-time-zone</h5></td>
Expand Down
39 changes: 32 additions & 7 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,11 +610,11 @@ public class CoreOptions implements Serializable {
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<Boolean> SEQUENCE_FIELD_SORT_IS_ASCENDING =
key("sequence.field.sort.is.ascending")
.booleanType()
.defaultValue(true)
.withDescription("Specify the order of sequence.field is ascending.");
public static final ConfigOption<SortOrder> SEQUENCE_FIELD_SORT_ORDER =
key("sequence.field.sort-order")
.enumType(SortOrder.class)
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
Expand Down Expand Up @@ -2050,8 +2050,8 @@ public List<String> sequenceField() {
.orElse(Collections.emptyList());
}

public Boolean sequenceFieldSortOrder() {
return options.get(SEQUENCE_FIELD_SORT_IS_ASCENDING);
public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}

public boolean partialUpdateRemoveRecordOnDelete() {
Expand Down Expand Up @@ -2393,6 +2393,31 @@ public InlineElement getDescription() {
}
}

/** Specifies the sort order for field sequence id. */
public enum SortOrder implements DescribedEnum {
ASCENDING("ascending", "specifies sequence.field sort order is ascending."),

DESCENDING("descending", "specifies sequence.field sort order is descending.");

private final String value;
private final String description;

SortOrder(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the log consistency mode for table. */
public enum LogConsistency implements DescribedEnum {
TRANSACTIONAL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private void bulkLoadBootstrapRecords() {
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompressOptions(),
coreOptions.writeBufferSpillDiskSize());
coreOptions.writeBufferSpillDiskSize(),
coreOptions.sequenceFieldSortOrderIsAscending());

Function<SortOrder, RowIterator> iteratorFunction =
sortOrder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter(
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
options.writeBufferSpillDiskSize());
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending());
}

/** A class wraps byte[] to implement equals and hashCode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
private final MergeFunctionFactory<KeyValue> mfFactory;
private final MergeSorter mergeSorter;
private final List<String> sequenceFields;
private final boolean sequenceOrder;

@Nullable private RowType readKeyType;

Expand Down Expand Up @@ -106,6 +107,7 @@ public MergeFileSplitRead(
new MergeSorter(
CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);
this.sequenceFields = options.sequenceField();
this.sequenceOrder = options.sequenceFieldSortOrderIsAscending();
}

public Comparator<InternalRow> keyComparator() {
Expand Down Expand Up @@ -338,6 +340,6 @@ private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader) {
@Nullable
public UserDefinedSeqComparator createUdsComparator() {
return UserDefinedSeqComparator.create(
readerFactoryBuilder.readValueType(), sequenceFields, true);
readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ public static BinaryExternalSortBuffer create(
int pageSize,
int maxNumFileHandles,
CompressOptions compression,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
boolean sequenceOrder) {
return create(
ioManager,
rowType,
keyFields,
new HeapMemorySegmentPool(bufferSize, pageSize),
maxNumFileHandles,
compression,
maxDiskSize);
maxDiskSize,
sequenceOrder);
}

public static BinaryExternalSortBuffer create(
Expand All @@ -119,8 +121,10 @@ public static BinaryExternalSortBuffer create(
MemorySegmentPool pool,
int maxNumFileHandles,
CompressOptions compression,
MemorySize maxDiskSize) {
RecordComparator comparator = newRecordComparator(rowType.getFieldTypes(), keyFields, true);
MemorySize maxDiskSize,
boolean sequenceOrder) {
RecordComparator comparator =
newRecordComparator(rowType.getFieldTypes(), keyFields, sequenceOrder);
BinaryInMemorySortBuffer sortBuffer =
BinaryInMemorySortBuffer.createBuffer(
newNormalizedKeyComputer(rowType.getFieldTypes(), keyFields),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public int compare(InternalRow o1, InternalRow o2) {

@Nullable
public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) {
return create(rowType, options.sequenceField(), options.sequenceFieldSortOrder());
return create(
rowType, options.sequenceField(), options.sequenceFieldSortOrderIsAscending());
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SortOperator extends TableStreamOperator<InternalRow>
private final CompressOptions spillCompression;
private final int sinkParallelism;
private final MemorySize maxDiskSize;
private final boolean sequenceOrder;

private transient BinaryExternalSortBuffer buffer;
private transient IOManager ioManager;
Expand All @@ -60,7 +61,8 @@ public SortOperator(
int spillSortMaxNumFiles,
CompressOptions spillCompression,
int sinkParallelism,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
boolean sequenceOrder) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
Expand All @@ -70,6 +72,7 @@ public SortOperator(
this.spillCompression = spillCompression;
this.sinkParallelism = sinkParallelism;
this.maxDiskSize = maxDiskSize;
this.sequenceOrder = sequenceOrder;
}

@Override
Expand Down Expand Up @@ -100,7 +103,8 @@ void initBuffer() {
pageSize,
spillSortMaxNumFiles,
spillCompression,
maxDiskSize);
maxDiskSize,
sequenceOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public Tuple2<KEY, RowData> map(RowData value) {
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
options.writeBufferSpillDiskSize()))
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ public void testSequenceFieldSortOrder() {

// test for get small record
sql(
"CREATE TABLE T2 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c BIGINT) WITH ('sequence.field'='c', 'sequence.field.sort.is.ascending'='false')");
"CREATE TABLE T2 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c BIGINT) WITH ('sequence.field'='c', 'sequence.field.sort-order'='descending')");
sql("INSERT INTO T2 VALUES ('a', 'b', 1)");
sql("INSERT INTO T2 VALUES ('a', 'd', 3)");
sql("INSERT INTO T2 VALUES ('a', 'e', 2)");
Expand All @@ -1029,7 +1029,7 @@ public void testSequenceFieldSortOrder() {

// test for get largest record
sql(
"CREATE TABLE T3 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c DOUBLE) WITH ('sequence.field'='c', 'sequence.field.sort.is.ascending'='true')");
"CREATE TABLE T3 (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c DOUBLE) WITH ('sequence.field'='c', 'sequence.field.sort-order'='ascending')");
sql("INSERT INTO T3 VALUES ('a', 'b', 1.0)");
sql("INSERT INTO T3 VALUES ('a', 'd', 3.0)");
sql("INSERT INTO T3 VALUES ('a', 'e', 2.0)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public void testSort() throws Exception {
128,
CompressOptions.defaultOptions(),
1,
MemorySize.MAX_VALUE) {};
MemorySize.MAX_VALUE,
true) {};

OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
Expand Down Expand Up @@ -114,7 +115,8 @@ public void testCloseSortOperator() throws Exception {
128,
CompressOptions.defaultOptions(),
1,
MemorySize.MAX_VALUE) {};
MemorySize.MAX_VALUE,
true) {};
OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
File[] files = harness.getEnvironment().getIOManager().getSpillingDirectories();
Expand Down

0 comments on commit 373dd68

Please sign in to comment.