From 439eee3e31f950dbe69e2fd4f6854c06c68ff5cc Mon Sep 17 00:00:00 2001 From: Fang Yong Date: Mon, 5 Aug 2024 12:49:53 +0800 Subject: [PATCH] [core] Calculate header size from field count for slice comparator (#3887) --- .../serializer/RowCompactedSerializer.java | 5 +- .../sort/SortLookupStoreFactoryTest.java | 60 ++++++++++++++++--- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java index 91c1c1c7c30f..3529349588ee 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java @@ -655,8 +655,9 @@ private static class SliceComparator implements Comparator { private final FieldReader[] fieldReaders; public SliceComparator(RowType rowType) { - this.reader1 = new RowReader(rowType.getFieldCount()); - this.reader2 = new RowReader(rowType.getFieldCount()); + int bitSetInBytes = calculateBitSetInBytes(rowType.getFieldCount()); + this.reader1 = new RowReader(bitSetInBytes); + this.reader2 = new RowReader(bitSetInBytes); this.fieldReaders = new FieldReader[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); i++) { fieldReaders[i] = createFieldReader(rowType.getTypeAt(i)); diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java index 804b8ed09c09..f8c84b858f9d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java @@ -18,9 +18,13 @@ package org.apache.paimon.lookup.sort; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.LookupStoreFactory.Context; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,11 +36,16 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link SortLookupStoreFactory}. */ public class SortLookupStoreFactoryTest { + private static final int VALUE_COUNT = 10_000_000; + private static final int QUERY_COUNT = 10_000; + + private final ThreadLocalRandom rnd = ThreadLocalRandom.current(); @TempDir Path tempDir; @@ -60,26 +69,63 @@ public void testNormal() throws IOException { "zstd"); SortLookupStoreWriter writer = factory.createWriter(file, null); - int valueCount = 10_000; - for (int i = 0; i < valueCount; i++) { + for (int i = 0; i < VALUE_COUNT; i++) { byte[] bytes = toBytes(i); writer.put(bytes, bytes); } Context context = writer.close(); SortLookupStoreReader reader = factory.createReader(file, context); - for (int i = 0; i < valueCount; i++) { - byte[] bytes = toBytes(i); - assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(i); + for (int i = 0; i < QUERY_COUNT; i++) { + int query = rnd.nextInt(VALUE_COUNT); + byte[] bytes = toBytes(query); + assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query); + } + reader.close(); + } + + @Test + public void testIntKey() throws IOException { + RowCompactedSerializer keySerializer = + new RowCompactedSerializer(RowType.of(new IntType())); + GenericRow row = new GenericRow(1); + SortLookupStoreFactory factory = + new SortLookupStoreFactory( + keySerializer.createSliceComparator(), + new CacheManager(MemorySize.ofMebiBytes(1)), + 64 * 1024, + "zstd"); + SortLookupStoreWriter writer = factory.createWriter(file, null); + for (int i = 0; i < VALUE_COUNT; i++) { + byte[] bytes = toBytes(keySerializer, row, i); + writer.put(bytes, toBytes(i)); + } + Context context = writer.close(); + + SortLookupStoreReader reader = factory.createReader(file, context); + for (int i = 0; i < QUERY_COUNT; i++) { + int query = rnd.nextInt(VALUE_COUNT); + byte[] bytes = toBytes(keySerializer, row, query); + try { + assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query); + } catch (Exception e) { + System.out.println("query value " + query + " failed"); + throw new RuntimeException(e); + } } reader.close(); } + private byte[] toBytes(RowCompactedSerializer serializer, GenericRow row, int i) { + row.setField(0, i); + return serializer.serializeToBytes(row); + } + private byte[] toBytes(int i) { - return String.valueOf(10_000 + i).getBytes(StandardCharsets.UTF_8); + return String.valueOf(VALUE_COUNT + i).getBytes(StandardCharsets.UTF_8); } private int fromBytes(byte[] bytes) { - return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) - 10_000; + return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) - VALUE_COUNT; } }