Skip to content

Commit

Permalink
add case
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 11, 2024
1 parent 9605534 commit 1f5b276
Showing 1 changed file with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
Expand All @@ -44,6 +43,7 @@
import java.util.Random;
import java.util.function.Consumer;

import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE;
import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.types.RowKind.DELETE;
Expand All @@ -57,15 +57,7 @@ class LocalMergeOperatorTest {
public void testHashNormal() throws Exception {
prepareHashOperator();
List<String> result = new ArrayList<>();
operator.setOutput(
new TestOutput(
row ->
result.add(
row.getRowKind().shortString()
+ ":"
+ row.getString(0)
+ "->"
+ row.getInt(1))));
setOutput(result);

// first test
processElement("a", 1);
Expand Down Expand Up @@ -105,15 +97,7 @@ public void testUserDefineSequence() throws Exception {
prepareHashOperator(options);

List<String> result = new ArrayList<>();
operator.setOutput(
new TestOutput(
row ->
result.add(
row.getRowKind().shortString()
+ ":"
+ row.getString(0)
+ "->"
+ row.getInt(1))));
setOutput(result);

processElement("a", 2);
processElement("b", 1);
Expand All @@ -123,12 +107,34 @@ public void testUserDefineSequence() throws Exception {
result.clear();
}

@Test
public void testHashSpill() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "2 m");
prepareHashOperator(options);
List<String> result = new ArrayList<>();
setOutput(result);

Map<String, String> expected = new HashMap<>();
for (int i = 0; i < 30_000; i++) {
String key = i + "";
expected.put(key, "+I:" + key + "->" + i);
processElement(key, i);
}

operator.prepareSnapshotPreBarrier(0);
assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
result.clear();
}

private void prepareHashOperator() throws Exception {
prepareHashOperator(new HashMap<>());
}

private void prepareHashOperator(Map<String, String> options) throws Exception {
options.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "10 m");
if (!options.containsKey(LOCAL_MERGE_BUFFER_SIZE.key())) {
options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "10 m");
}
RowType rowType =
RowType.of(
DataTypes.STRING(),
Expand All @@ -150,6 +156,18 @@ private void prepareHashOperator(Map<String, String> options) throws Exception {
assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class);
}

private void setOutput(List<String> result) {
operator.setOutput(
new TestOutput(
row ->
result.add(
row.getRowKind().shortString()
+ ":"
+ row.getString(0)
+ "->"
+ row.getInt(1))));
}

private void processElement(String key, int value) throws Exception {
processElement(RowKind.INSERT, key, value);
}
Expand Down

0 comments on commit 1f5b276

Please sign in to comment.