Skip to content

Commit

Permalink
[core] Support sequence.field sort order for pk table (apache#4385)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Oct 30, 2024
1 parent 0797e81 commit 6a6d124
Show file tree
Hide file tree
Showing 19 changed files with 137 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,12 @@
<td>String</td>
<td>The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent.</td>
</tr>
<tr>
<td><h5>sequence.field.sort-order</h5></td>
<td style="word-wrap: break-word;">ascending</td>
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible values:<ul><li>"ascending": specifies sequence.field sort order is ascending.</li><li>"descending": specifies sequence.field sort order is descending.</li></ul></td>
</tr>
<tr>
<td><h5>sink.watermark-time-zone</h5></td>
<td style="word-wrap: break-word;">"UTC"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ public GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer(
List<DataType> inputTypes, int[] sortFields) {
return new SortCodeGenerator(
RowType.builder().fields(inputTypes).build(),
getAscendingSortSpec(sortFields))
getAscendingSortSpec(sortFields, true))
.generateNormalizedKeyComputer("NormalizedKeyComputer");
}

@Override
public GeneratedClass<RecordComparator> generateRecordComparator(
List<DataType> inputTypes, int[] sortFields) {
List<DataType> inputTypes, int[] sortFields, boolean isAscendingOrder) {
return ComparatorCodeGenerator.gen(
"RecordComparator",
RowType.builder().fields(inputTypes).build(),
getAscendingSortSpec(sortFields));
getAscendingSortSpec(sortFields, isAscendingOrder));
}

@Override
Expand All @@ -59,10 +59,10 @@ public GeneratedClass<RecordEqualiser> generateRecordEqualiser(
.generateRecordEqualiser("RecordEqualiser");
}

private SortSpec getAscendingSortSpec(int[] sortFields) {
private SortSpec getAscendingSortSpec(int[] sortFields, boolean isAscendingOrder) {
SortSpec.SortSpecBuilder builder = SortSpec.builder();
for (int sortField : sortFields) {
builder.addField(sortField, true, false);
builder.addField(sortField, isAscendingOrder, false);
}
return builder.build();
}
Expand Down
36 changes: 36 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,13 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<SortOrder> SEQUENCE_FIELD_SORT_ORDER =
key("sequence.field.sort-order")
.enumType(SortOrder.class)
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
Expand Down Expand Up @@ -2043,6 +2050,10 @@ public List<String> sequenceField() {
.orElse(Collections.emptyList());
}

public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}

public boolean partialUpdateRemoveRecordOnDelete() {
return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
}
Expand Down Expand Up @@ -2382,6 +2393,31 @@ public InlineElement getDescription() {
}
}

/** Specifies the sort order for field sequence id. */
public enum SortOrder implements DescribedEnum {
ASCENDING("ascending", "specifies sequence.field sort order is ascending."),

DESCENDING("descending", "specifies sequence.field sort order is descending.");

private final String value;
private final String description;

SortOrder(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the log consistency mode for table. */
public enum LogConsistency implements DescribedEnum {
TRANSACTIONAL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer(
* @param inputTypes input types.
* @param sortFields the sort key fields. Records are compared by the first field, then the
* second field, then the third field and so on. All fields are compared in ascending order.
* @param isAscendingOrder decide the sort key fields order whether is ascending
*/
GeneratedClass<RecordComparator> generateRecordComparator(
List<DataType> inputTypes, int[] sortFields);
List<DataType> inputTypes, int[] sortFields, boolean isAscendingOrder);

/** Generate a {@link RecordEqualiser} with fields. */
GeneratedClass<RecordEqualiser> generateRecordEqualiser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,20 @@ public static NormalizedKeyComputer newNormalizedKeyComputer(
}

public static RecordComparator newRecordComparator(List<DataType> inputTypes) {
return newRecordComparator(inputTypes, IntStream.range(0, inputTypes.size()).toArray());
return newRecordComparator(
inputTypes, IntStream.range(0, inputTypes.size()).toArray(), true);
}

public static RecordComparator newRecordComparator(
List<DataType> inputTypes, int[] sortFields) {
List<DataType> inputTypes, int[] sortFields, boolean isAscendingOrder) {
return generate(
RecordComparator.class,
inputTypes,
sortFields,
() -> getCodeGenerator().generateRecordComparator(inputTypes, sortFields));
() ->
getCodeGenerator()
.generateRecordComparator(
inputTypes, sortFields, isAscendingOrder));
}

public static RecordEqualiser newRecordEqualiser(List<DataType> fieldTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private void bulkLoadBootstrapRecords() {
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompressOptions(),
coreOptions.writeBufferSpillDiskSize());
coreOptions.writeBufferSpillDiskSize(),
coreOptions.sequenceFieldSortOrderIsAscending());

Function<SortOrder, RowIterator> iteratorFunction =
sortOrder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter(
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
options.writeBufferSpillDiskSize());
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending());
}

/** A class wraps byte[] to implement equals and hashCode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public SortBufferWriteBuffer(
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);
RecordComparator keyComparator =
CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);
CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, true);

if (memoryPool.freePages() < 3) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
.collect(Collectors.toList());

Supplier<FieldsComparator> userDefinedSeqComparator =
() -> UserDefinedSeqComparator.create(rowType, sequenceFields);
() -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
Arrays.stream(v.split(FIELDS_SEPARATOR))
.map(
fieldName ->
Expand Down Expand Up @@ -390,7 +390,7 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
projectedSeqComparators.put(
newField,
UserDefinedSeqComparator.create(
newRowType, newSequenceFields));
newRowType, newSequenceFields, true));
}
});
for (int i = 0; i < projects.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
private final MergeFunctionFactory<KeyValue> mfFactory;
private final MergeSorter mergeSorter;
private final List<String> sequenceFields;
private final boolean sequenceOrder;

@Nullable private RowType readKeyType;

Expand Down Expand Up @@ -106,6 +107,7 @@ public MergeFileSplitRead(
new MergeSorter(
CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);
this.sequenceFields = options.sequenceField();
this.sequenceOrder = options.sequenceFieldSortOrderIsAscending();
}

public Comparator<InternalRow> keyComparator() {
Expand Down Expand Up @@ -338,6 +340,6 @@ private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader) {
@Nullable
public UserDefinedSeqComparator createUdsComparator() {
return UserDefinedSeqComparator.create(
readerFactoryBuilder.readValueType(), sequenceFields);
readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ public static BinaryExternalSortBuffer create(
int pageSize,
int maxNumFileHandles,
CompressOptions compression,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
boolean sequenceOrder) {
return create(
ioManager,
rowType,
keyFields,
new HeapMemorySegmentPool(bufferSize, pageSize),
maxNumFileHandles,
compression,
maxDiskSize);
maxDiskSize,
sequenceOrder);
}

public static BinaryExternalSortBuffer create(
Expand All @@ -119,8 +121,10 @@ public static BinaryExternalSortBuffer create(
MemorySegmentPool pool,
int maxNumFileHandles,
CompressOptions compression,
MemorySize maxDiskSize) {
RecordComparator comparator = newRecordComparator(rowType.getFieldTypes(), keyFields);
MemorySize maxDiskSize,
boolean sequenceOrder) {
RecordComparator comparator =
newRecordComparator(rowType.getFieldTypes(), keyFields, sequenceOrder);
BinaryInMemorySortBuffer sortBuffer =
BinaryInMemorySortBuffer.createBuffer(
newNormalizedKeyComputer(rowType.getFieldTypes(), keyFields),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public KeyComparatorSupplier(RowType keyType) {

@Override
public RecordComparator get() {
return newRecordComparator(inputTypes, sortFields);
return newRecordComparator(inputTypes, sortFields, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,33 @@ public int compare(InternalRow o1, InternalRow o2) {

@Nullable
public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) {
return create(rowType, options.sequenceField());
return create(
rowType, options.sequenceField(), options.sequenceFieldSortOrderIsAscending());
}

@Nullable
public static UserDefinedSeqComparator create(RowType rowType, List<String> sequenceFields) {
public static UserDefinedSeqComparator create(
RowType rowType, List<String> sequenceFields, boolean isAscendingOrder) {
if (sequenceFields.isEmpty()) {
return null;
}

List<String> fieldNames = rowType.getFieldNames();
int[] fields = sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();

return create(rowType, fields);
return create(rowType, fields, isAscendingOrder);
}

@Nullable
public static UserDefinedSeqComparator create(RowType rowType, int[] sequenceFields) {
public static UserDefinedSeqComparator create(
RowType rowType, int[] sequenceFields, boolean isAscendingOrder) {
if (sequenceFields.length == 0) {
return null;
}

RecordComparator comparator =
CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), sequenceFields);
CodeGenUtils.newRecordComparator(
rowType.getFieldTypes(), sequenceFields, isAscendingOrder);
return new UserDefinedSeqComparator(sequenceFields, comparator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ public void testNormalizedKeyComputerCodegenCacheMiss() {
@Test
public void testRecordComparatorCodegenCache() {
assertClassEquals(
() -> newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}));
() -> newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true));
}

@Test
public void testRecordComparatorCodegenCacheMiss() {
assertClassNotEquals(
newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}),
newRecordComparator(Arrays.asList(STRING(), INT(), DOUBLE()), new int[] {0, 1, 2}));
newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true),
newRecordComparator(
Arrays.asList(STRING(), INT(), DOUBLE()), new int[] {0, 1, 2}, true));
}

@Test
Expand All @@ -96,7 +97,7 @@ public void testRecordEqualiserCodegenCacheMiss() {
@Test
public void testHybridNotEqual() {
assertClassNotEquals(
newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}),
newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true),
newNormalizedKeyComputer(Arrays.asList(STRING(), INT()), new int[] {0, 1}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ public abstract class FullCacheLookupTable implements LookupTable {
public FullCacheLookupTable(Context context) {
this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
CoreOptions coreOptions = new CoreOptions(table.options());
if (table.primaryKeys().size() > 0) {
sequenceFields = new CoreOptions(table.options()).sequenceField();
sequenceFields = coreOptions.sequenceField();
}
RowType projectedType = TypeUtils.project(table.rowType(), context.projection);
if (sequenceFields.size() > 0) {
Expand All @@ -111,7 +112,10 @@ public FullCacheLookupTable(Context context) {
projectedType = builder.build();
context = context.copy(table.rowType().getFieldIndices(projectedType.getFieldNames()));
this.userDefinedSeqComparator =
UserDefinedSeqComparator.create(projectedType, sequenceFields);
UserDefinedSeqComparator.create(
projectedType,
sequenceFields,
coreOptions.sequenceFieldSortOrderIsAscending());
this.appendUdsFieldNumber = appendUdsFieldNumber.get();
} else {
this.userDefinedSeqComparator = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SortOperator extends TableStreamOperator<InternalRow>
private final CompressOptions spillCompression;
private final int sinkParallelism;
private final MemorySize maxDiskSize;
private final boolean sequenceOrder;

private transient BinaryExternalSortBuffer buffer;
private transient IOManager ioManager;
Expand All @@ -60,7 +61,8 @@ public SortOperator(
int spillSortMaxNumFiles,
CompressOptions spillCompression,
int sinkParallelism,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
boolean sequenceOrder) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
Expand All @@ -70,6 +72,7 @@ public SortOperator(
this.spillCompression = spillCompression;
this.sinkParallelism = sinkParallelism;
this.maxDiskSize = maxDiskSize;
this.sequenceOrder = sequenceOrder;
}

@Override
Expand Down Expand Up @@ -100,7 +103,8 @@ void initBuffer() {
pageSize,
spillSortMaxNumFiles,
spillCompression,
maxDiskSize);
maxDiskSize,
sequenceOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public Tuple2<KEY, RowData> map(RowData value) {
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
options.writeBufferSpillDiskSize()))
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
Expand Down
Loading

0 comments on commit 6a6d124

Please sign in to comment.