Skip to content

Commit

Permalink
[core] Add reader benchmark for sorted file store (apache#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Aug 27, 2024
1 parent d373a86 commit ed48b6b
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,90 @@

package org.apache.paimon.benchmark.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

/** Abstract benchmark class for lookup. */
abstract class AbstractLookupBenchmark {
protected static final int[] VALUE_LENGTHS = {0, 500, 1000, 2000, 4000};
protected static final int[] VALUE_LENGTHS = {0, 64, 500, 1000, 2000};
protected static final List<Integer> RECORD_COUNT_LIST =
Arrays.asList(100000, 1000000, 5000000, 10000000, 15000000);
private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
private final RowCompactedSerializer keySerializer =
new RowCompactedSerializer(RowType.of(new IntType()));
private final GenericRow reusedKey = new GenericRow(1);

protected byte[][] generateSequenceInputs(int start, int end) {
int count = end - start;
byte[][] result = new byte[count][4];
byte[][] result = new byte[count][6];
for (int i = 0; i < count; i++) {
result[i] = intToByteArray(i);
}
return result;
}

protected byte[][] generateRandomInputs(int start, int end) {
return generateRandomInputs(start, end, end - start);
}

protected byte[][] generateRandomInputs(int start, int end, int count) {
byte[][] result = new byte[count][6];
for (int i = 0; i < count; i++) {
result[i] = intToByteArray(rnd.nextInt(start, end));
}
return result;
}

protected byte[] intToByteArray(int value) {
return new byte[] {
(byte) (value >>> 24), (byte) (value >>> 16), (byte) (value >>> 8), (byte) value
};
reusedKey.setField(0, value);
return keySerializer.serializeToBytes(reusedKey);
}

protected Pair<String, LookupStoreFactory.Context> writeData(
Path tempDir, CoreOptions options, byte[][] inputs, int valueLength, boolean sameValue)
throws IOException {
byte[] value1 = new byte[valueLength];
byte[] value2 = new byte[valueLength];
Arrays.fill(value1, (byte) 1);
Arrays.fill(value2, (byte) 2);
LookupStoreFactory factory =
LookupStoreFactory.create(
options,
new CacheManager(MemorySize.ofMebiBytes(10)),
keySerializer.createSliceComparator());

File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
LookupStoreWriter writer = factory.createWriter(file, null);
int i = 0;
for (byte[] input : inputs) {
if (sameValue) {
writer.put(input, value1);
} else {
if (i == 0) {
writer.put(input, value1);
} else {
writer.put(input, value2);
}
i = (i + 1) % 2;
}
}
LookupStoreFactory.Context context = writer.close();
return Pair.of(file.getAbsolutePath(), context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

/** Benchmark for measure the bloom filter for lookup. */
public class LookupBloomFilterBenchmark extends AbstractLookupBenchmark {

@TempDir Path tempDir;
ThreadLocalRandom rnd = ThreadLocalRandom.current();

@Test
public void testHighMatch() throws Exception {
Expand All @@ -58,15 +56,6 @@ public void testLowMatch() throws Exception {
"lookup", generateSequenceInputs(0, 100000), generateRandomInputs(100000, 200000));
}

private byte[][] generateRandomInputs(int start, int end) {
int count = end - start;
byte[][] result = new byte[count][4];
for (int i = 0; i < count; i++) {
result[i] = intToByteArray(rnd.nextInt(start, end));
}
return result;
}

public void innerTest(String name, byte[][] inputs, byte[][] probe) throws Exception {
Benchmark benchmark =
new Benchmark(name, probe.length).setNumWarmupIters(1).setOutputPerIteration(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.benchmark.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.benchmark.Benchmark;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.assertj.core.api.Assertions.assertThat;

/** Benchmark for measuring the throughput of writing for lookup. */
@ExtendWith(ParameterizedTestExtension.class)
public class LookupReaderBenchmark extends AbstractLookupBenchmark {
private static final int QUERY_KEY_COUNT = 10000;
private final int recordCount;
@TempDir Path tempDir;

public LookupReaderBenchmark(int recordCount) {
this.recordCount = recordCount;
}

@Parameters(name = "record-count-{0}")
public static List<Integer> getVarSeg() {
return RECORD_COUNT_LIST;
}

@TestTemplate
void testLookupReader() throws IOException {
readLookupDataBenchmark(
generateSequenceInputs(0, recordCount),
generateRandomInputs(0, recordCount, QUERY_KEY_COUNT));
}

private void readLookupDataBenchmark(byte[][] inputs, byte[][] randomInputs)
throws IOException {
Benchmark benchmark =
new Benchmark("reader-" + randomInputs.length, randomInputs.length)
.setNumWarmupIters(1)
.setOutputPerIteration(true);
for (int valueLength : VALUE_LENGTHS) {
for (CoreOptions.LookupLocalFileType fileType :
CoreOptions.LookupLocalFileType.values()) {
CoreOptions options =
CoreOptions.fromMap(
Collections.singletonMap(
LOOKUP_LOCAL_FILE_TYPE.key(), fileType.name()));
Pair<String, LookupStoreFactory.Context> pair =
writeData(tempDir, options, inputs, valueLength, false);
benchmark.addCase(
String.format(
"%s-read-%dB-value-%d-num",
fileType.name(), valueLength, randomInputs.length),
5,
() -> {
try {
readData(options, randomInputs, pair.getLeft(), pair.getRight());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}

benchmark.run();
}

private void readData(
CoreOptions options,
byte[][] randomInputs,
String filePath,
LookupStoreFactory.Context context)
throws IOException {
LookupStoreFactory factory =
LookupStoreFactory.create(
options,
new CacheManager(MemorySize.ofMebiBytes(10)),
new RowCompactedSerializer(RowType.of(new IntType()))
.createSliceComparator());

File file = new File(filePath);
LookupStoreReader reader = factory.createReader(file, context);
for (byte[] input : randomInputs) {
assertThat(reader.lookup(input)).isNotNull();
}
reader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,23 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.benchmark.Benchmark;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;

/** Benchmark for measuring the throughput of writing for lookup. */
@ExtendWith(ParameterizedTestExtension.class)
public class LookupWriterBenchmark extends AbstractLookupBenchmark {

private final int recordCount;
@TempDir Path tempDir;

Expand All @@ -57,7 +46,7 @@ public LookupWriterBenchmark(int recordCount) {

@Parameters(name = "record-count-{0}")
public static List<Integer> getVarSeg() {
return Arrays.asList(1000000, 5000000, 10000000, 15000000, 20000000);
return RECORD_COUNT_LIST;
}

@TestTemplate
Expand Down Expand Up @@ -89,7 +78,7 @@ private void writeLookupDataBenchmark(byte[][] inputs, boolean sameValue) {
5,
() -> {
try {
writeData(options, inputs, valueLength, sameValue);
writeData(tempDir, options, inputs, valueLength, sameValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -99,33 +88,4 @@ private void writeLookupDataBenchmark(byte[][] inputs, boolean sameValue) {

benchmark.run();
}

private void writeData(CoreOptions options, byte[][] inputs, int valueLength, boolean sameValue)
throws IOException {
byte[] value1 = new byte[valueLength];
byte[] value2 = new byte[valueLength];
Arrays.fill(value1, (byte) 1);
Arrays.fill(value2, (byte) 2);
LookupStoreFactory factory =
LookupStoreFactory.create(
options,
new CacheManager(MemorySize.ofMebiBytes(10)),
new RowCompactedSerializer(RowType.of(new IntType()))
.createSliceComparator());

File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
LookupStoreWriter writer = factory.createWriter(file, null);
boolean first = true;
for (byte[] input : inputs) {
if (first) {
writer.put(input, value1);
} else {
writer.put(input, value2);
}
if (!sameValue) {
first = !first;
}
}
writer.close();
}
}

0 comments on commit ed48b6b

Please sign in to comment.