Skip to content

Commit

Permalink
[core] Calculate header size from field count for slice comparator
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed Aug 4, 2024
1 parent 8f69480 commit 1c18e30
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,9 @@ private static class SliceComparator implements Comparator<MemorySlice> {
private final FieldReader[] fieldReaders;

public SliceComparator(RowType rowType) {
this.reader1 = new RowReader(rowType.getFieldCount());
this.reader2 = new RowReader(rowType.getFieldCount());
int bitSetInBytes = calculateBitSetInBytes(rowType.getFieldCount());
this.reader1 = new RowReader(bitSetInBytes);
this.reader2 = new RowReader(bitSetInBytes);
this.fieldReaders = new FieldReader[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package org.apache.paimon.lookup.sort;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory.Context;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -32,11 +36,16 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link SortLookupStoreFactory}. */
public class SortLookupStoreFactoryTest {
private static final int VALUE_COUNT = 10_000_000;
private static final int QUERY_COUNT = 10_000;

private final ThreadLocalRandom rnd = ThreadLocalRandom.current();

@TempDir Path tempDir;

Expand All @@ -60,26 +69,63 @@ public void testNormal() throws IOException {
"zstd");

SortLookupStoreWriter writer = factory.createWriter(file, null);
int valueCount = 10_000;
for (int i = 0; i < valueCount; i++) {
for (int i = 0; i < VALUE_COUNT; i++) {
byte[] bytes = toBytes(i);
writer.put(bytes, bytes);
}
Context context = writer.close();

SortLookupStoreReader reader = factory.createReader(file, context);
for (int i = 0; i < valueCount; i++) {
byte[] bytes = toBytes(i);
assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(i);
for (int i = 0; i < QUERY_COUNT; i++) {
int query = rnd.nextInt(VALUE_COUNT);
byte[] bytes = toBytes(query);
assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query);
}
reader.close();
}

@Test
public void testIntKey() throws IOException {
RowCompactedSerializer keySerializer =
new RowCompactedSerializer(RowType.of(new IntType()));
GenericRow row = new GenericRow(1);
SortLookupStoreFactory factory =
new SortLookupStoreFactory(
keySerializer.createSliceComparator(),
new CacheManager(MemorySize.ofMebiBytes(1)),
64 * 1024,
"zstd");
SortLookupStoreWriter writer = factory.createWriter(file, null);
for (int i = 0; i < VALUE_COUNT; i++) {
byte[] bytes = toBytes(keySerializer, row, i);
writer.put(bytes, toBytes(i));
}
Context context = writer.close();

SortLookupStoreReader reader = factory.createReader(file, context);
for (int i = 0; i < QUERY_COUNT; i++) {
int query = rnd.nextInt(VALUE_COUNT);
byte[] bytes = toBytes(keySerializer, row, query);
try {
assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query);
} catch (Exception e) {
System.out.println("query value " + query + " failed");
throw new RuntimeException(e);
}
}
reader.close();
}

private byte[] toBytes(RowCompactedSerializer serializer, GenericRow row, int i) {
row.setField(0, i);
return serializer.serializeToBytes(row);
}

private byte[] toBytes(int i) {
return String.valueOf(10_000 + i).getBytes(StandardCharsets.UTF_8);
return String.valueOf(VALUE_COUNT + i).getBytes(StandardCharsets.UTF_8);
}

private int fromBytes(byte[] bytes) {
return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) - 10_000;
return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) - VALUE_COUNT;
}
}

0 comments on commit 1c18e30

Please sign in to comment.