diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java index 652142941f70..430a207f5c36 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java @@ -18,22 +18,90 @@ package org.apache.paimon.benchmark.lookup; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.lookup.LookupStoreFactory; +import org.apache.paimon.lookup.LookupStoreWriter; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + /** Abstract benchmark class for lookup. */ abstract class AbstractLookupBenchmark { - protected static final int[] VALUE_LENGTHS = {0, 500, 1000, 2000, 4000}; + protected static final int[] VALUE_LENGTHS = {0, 64, 500, 1000, 2000}; + protected static final List RECORD_COUNT_LIST = + Arrays.asList(100000, 1000000, 5000000, 10000000, 15000000); + private final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + private final RowCompactedSerializer keySerializer = + new RowCompactedSerializer(RowType.of(new IntType())); + private final GenericRow reusedKey = new GenericRow(1); protected byte[][] generateSequenceInputs(int start, int end) { int count = end - start; - byte[][] result = new byte[count][4]; + byte[][] result = new byte[count][6]; for (int i = 0; i < count; i++) { result[i] = intToByteArray(i); } return result; } + protected byte[][] generateRandomInputs(int start, int end) { + return generateRandomInputs(start, end, end - start); + } + + protected byte[][] generateRandomInputs(int start, int end, int count) { + byte[][] result = new byte[count][6]; + for (int i = 0; i < count; i++) { + result[i] = intToByteArray(rnd.nextInt(start, end)); + } + return result; + } + protected byte[] intToByteArray(int value) { - return new byte[] { - (byte) (value >>> 24), (byte) (value >>> 16), (byte) (value >>> 8), (byte) value - }; + reusedKey.setField(0, value); + return keySerializer.serializeToBytes(reusedKey); + } + + protected Pair writeData( + Path tempDir, CoreOptions options, byte[][] inputs, int valueLength, boolean sameValue) + throws IOException { + byte[] value1 = new byte[valueLength]; + byte[] value2 = new byte[valueLength]; + Arrays.fill(value1, (byte) 1); + Arrays.fill(value2, (byte) 2); + LookupStoreFactory factory = + LookupStoreFactory.create( + options, + new CacheManager(MemorySize.ofMebiBytes(10)), + keySerializer.createSliceComparator()); + + File file = new File(tempDir.toFile(), UUID.randomUUID().toString()); + LookupStoreWriter writer = factory.createWriter(file, null); + int i = 0; + for (byte[] input : inputs) { + if (sameValue) { + writer.put(input, value1); + } else { + if (i == 0) { + writer.put(input, value1); + } else { + writer.put(input, value2); + } + i = (i + 1) % 2; + } + } + LookupStoreFactory.Context context = writer.close(); + return Pair.of(file.getAbsolutePath(), context); } } diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java index 68f9b38ff771..db237f28f9f7 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java @@ -34,13 +34,11 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; /** Benchmark for measure the bloom filter for lookup. */ public class LookupBloomFilterBenchmark extends AbstractLookupBenchmark { @TempDir Path tempDir; - ThreadLocalRandom rnd = ThreadLocalRandom.current(); @Test public void testHighMatch() throws Exception { @@ -58,15 +56,6 @@ public void testLowMatch() throws Exception { "lookup", generateSequenceInputs(0, 100000), generateRandomInputs(100000, 200000)); } - private byte[][] generateRandomInputs(int start, int end) { - int count = end - start; - byte[][] result = new byte[count][4]; - for (int i = 0; i < count; i++) { - result[i] = intToByteArray(rnd.nextInt(start, end)); - } - return result; - } - public void innerTest(String name, byte[][] inputs, byte[][] probe) throws Exception { Benchmark benchmark = new Benchmark(name, probe.length).setNumWarmupIters(1).setOutputPerIteration(true); diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java new file mode 100644 index 000000000000..6327d703afe0 --- /dev/null +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.benchmark.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.benchmark.Benchmark; +import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.lookup.LookupStoreFactory; +import org.apache.paimon.lookup.LookupStoreReader; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Benchmark for measuring the throughput of writing for lookup. */ +@ExtendWith(ParameterizedTestExtension.class) +public class LookupReaderBenchmark extends AbstractLookupBenchmark { + private static final int QUERY_KEY_COUNT = 10000; + private final int recordCount; + @TempDir Path tempDir; + + public LookupReaderBenchmark(int recordCount) { + this.recordCount = recordCount; + } + + @Parameters(name = "record-count-{0}") + public static List getVarSeg() { + return RECORD_COUNT_LIST; + } + + @TestTemplate + void testLookupReader() throws IOException { + readLookupDataBenchmark( + generateSequenceInputs(0, recordCount), + generateRandomInputs(0, recordCount, QUERY_KEY_COUNT)); + } + + private void readLookupDataBenchmark(byte[][] inputs, byte[][] randomInputs) + throws IOException { + Benchmark benchmark = + new Benchmark("reader-" + randomInputs.length, randomInputs.length) + .setNumWarmupIters(1) + .setOutputPerIteration(true); + for (int valueLength : VALUE_LENGTHS) { + for (CoreOptions.LookupLocalFileType fileType : + CoreOptions.LookupLocalFileType.values()) { + CoreOptions options = + CoreOptions.fromMap( + Collections.singletonMap( + LOOKUP_LOCAL_FILE_TYPE.key(), fileType.name())); + Pair pair = + writeData(tempDir, options, inputs, valueLength, false); + benchmark.addCase( + String.format( + "%s-read-%dB-value-%d-num", + fileType.name(), valueLength, randomInputs.length), + 5, + () -> { + try { + readData(options, randomInputs, pair.getLeft(), pair.getRight()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + benchmark.run(); + } + + private void readData( + CoreOptions options, + byte[][] randomInputs, + String filePath, + LookupStoreFactory.Context context) + throws IOException { + LookupStoreFactory factory = + LookupStoreFactory.create( + options, + new CacheManager(MemorySize.ofMebiBytes(10)), + new RowCompactedSerializer(RowType.of(new IntType())) + .createSliceComparator()); + + File file = new File(filePath); + LookupStoreReader reader = factory.createReader(file, context); + for (byte[] input : randomInputs) { + assertThat(reader.lookup(input)).isNotNull(); + } + reader.close(); + } +} diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java index d5dfe31f35ce..fe4f1cd2493d 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java @@ -20,34 +20,23 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.benchmark.Benchmark; -import org.apache.paimon.data.serializer.RowCompactedSerializer; -import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.LookupStoreFactory; -import org.apache.paimon.lookup.LookupStoreWriter; -import org.apache.paimon.options.MemorySize; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import java.io.File; import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.UUID; import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; /** Benchmark for measuring the throughput of writing for lookup. */ @ExtendWith(ParameterizedTestExtension.class) public class LookupWriterBenchmark extends AbstractLookupBenchmark { - private final int recordCount; @TempDir Path tempDir; @@ -57,7 +46,7 @@ public LookupWriterBenchmark(int recordCount) { @Parameters(name = "record-count-{0}") public static List getVarSeg() { - return Arrays.asList(1000000, 5000000, 10000000, 15000000, 20000000); + return RECORD_COUNT_LIST; } @TestTemplate @@ -89,7 +78,7 @@ private void writeLookupDataBenchmark(byte[][] inputs, boolean sameValue) { 5, () -> { try { - writeData(options, inputs, valueLength, sameValue); + writeData(tempDir, options, inputs, valueLength, sameValue); } catch (IOException e) { throw new RuntimeException(e); } @@ -99,33 +88,4 @@ private void writeLookupDataBenchmark(byte[][] inputs, boolean sameValue) { benchmark.run(); } - - private void writeData(CoreOptions options, byte[][] inputs, int valueLength, boolean sameValue) - throws IOException { - byte[] value1 = new byte[valueLength]; - byte[] value2 = new byte[valueLength]; - Arrays.fill(value1, (byte) 1); - Arrays.fill(value2, (byte) 2); - LookupStoreFactory factory = - LookupStoreFactory.create( - options, - new CacheManager(MemorySize.ofMebiBytes(10)), - new RowCompactedSerializer(RowType.of(new IntType())) - .createSliceComparator()); - - File file = new File(tempDir.toFile(), UUID.randomUUID().toString()); - LookupStoreWriter writer = factory.createWriter(file, null); - boolean first = true; - for (byte[] input : inputs) { - if (first) { - writer.put(input, value1); - } else { - writer.put(input, value2); - } - if (!sameValue) { - first = !first; - } - } - writer.close(); - } }