Skip to content

Commit

Permalink
Showing 3 changed files with 177 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.NormalizedKeyComputer;
import org.apache.paimon.codegen.RecordComparator;
@@ -43,14 +44,21 @@
public class SortOperator extends TableStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {

private final RowType keyType;
private final RowType rowType;
private final long maxMemory;
private final int pageSize;
private final int arity;
private final int spillSortMaxNumFiles;
private transient BinaryExternalSortBuffer buffer;

public SortOperator(RowType rowType, long maxMemory, int pageSize, int spillSortMaxNumFiles) {
public SortOperator(
RowType keyType,
RowType rowType,
long maxMemory,
int pageSize,
int spillSortMaxNumFiles) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
this.pageSize = pageSize;
@@ -61,13 +69,17 @@ public SortOperator(RowType rowType, long maxMemory, int pageSize, int spillSort
@Override
public void open() throws Exception {
super.open();
initBuffer();
}

@VisibleForTesting
void initBuffer() {
InternalRowSerializer serializer = InternalSerializers.create(rowType);
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(
rowType.getFieldTypes(), "MemTableKeyComputer");
keyType.getFieldTypes(), "MemTableKeyComputer");
RecordComparator keyComparator =
CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), "MemTableComparator");
CodeGenUtils.newRecordComparator(keyType.getFieldTypes(), "MemTableComparator");

MemorySegmentPool memoryPool = new HeapMemorySegmentPool(maxMemory, pageSize);

@@ -100,8 +112,19 @@ public void endInput() throws Exception {
}
}

@Override
public void close() throws Exception {
super.close();
buffer.clear();
}

@Override
public void processElement(StreamRecord<InternalRow> element) throws Exception {
buffer.write(element.getValue());
}

@VisibleForTesting
BinaryExternalSortBuffer getBuffer() {
return buffer;
}
}
Original file line number Diff line number Diff line change
@@ -155,6 +155,7 @@ public Tuple2<KEY, RowData> map(RowData value) {
"LOCAL SORT",
internalRowType,
new SortOperator(
sortKeyType,
longRowType,
options.writeBufferSize(),
options.pageSize(),
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sorter;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.MutableObjectIterator;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;

/** Test for {@link SortOperator}. */
public class SortOperatorTest {

@Test
public void testSort() throws Exception {
RowType keyRowType =
new RowType(
Collections.singletonList(
new DataField(0, "a", new BigIntType(), "Someone's desc.")));

RowType rowType =
new RowType(
Arrays.asList(
new DataField(0, "a", new BigIntType()),
new DataField(1, "b", new VarCharType(), "Someone's desc."),
new DataField(2, "c", new VarCharType(), "Someone's desc.")));

SortOperator sortOperator =
new SortOperator(
keyRowType,
rowType,
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128) {};

OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();

for (int i = 0; i < 10000; i++) {
harness.processElement(
new StreamRecord<>(
GenericRow.of(
(long) 10000 - i,
BinaryString.fromString(""),
BinaryString.fromString(""))));
}

BinaryExternalSortBuffer externalSortBuffer = sortOperator.getBuffer();
MutableObjectIterator<BinaryRow> iterator = externalSortBuffer.sortedIterator();
BinaryRow row;
BinaryRow reuse = new BinaryRow(3);
long i = 1;
while ((row = iterator.next(reuse)) != null) {
Assertions.assertThat(row.getLong(0)).isEqualTo(i++);
}

harness.close();
}

@Test
public void testCloseSortOprator() throws Exception {
RowType keyRowType =
new RowType(
Collections.singletonList(
new DataField(0, "a", new VarCharType(), "Someone's desc.")));

RowType rowType =
new RowType(
Arrays.asList(
new DataField(0, "a", new VarCharType(), "Someone's desc."),
new DataField(0, "b", new VarCharType(), "Someone's desc."),
new DataField(1, "c", new BigIntType())));

SortOperator sortOperator =
new SortOperator(
keyRowType,
rowType,
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128) {};
OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
File[] files = harness.getEnvironment().getIOManager().getSpillingDirectories();

char[] data = new char[1024];
for (int i = 0; i < 1024; i++) {
data[i] = (char) ('a' + i % 26);
}

for (int i = 0; i < 10000; i++) {
harness.processElement(
new StreamRecord<>(
GenericRow.of(
BinaryString.fromString(String.valueOf(data)),
BinaryString.fromString(String.valueOf(data)),
(long) i)));
}

harness.close();
for (File file : files) {
assertNoDataFile(file);
}
}

private void assertNoDataFile(File fileDir) {
if (fileDir.exists()) {
Assertions.assertThat(fileDir.isDirectory()).isTrue();
for (File file : fileDir.listFiles()) {
assertNoDataFile(file);
}
}
}

private OneInputStreamOperatorTestHarness createTestHarness(SortOperator operator)
throws Exception {
OneInputStreamOperatorTestHarness testHarness =
new OneInputStreamOperatorTestHarness(operator);
return testHarness;
}
}

0 comments on commit e1175b8

Please sign in to comment.