Skip to content

Commit

Permalink
[core] Allow FileIndexer to seek and skip if they need to. (#3406)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Jun 17, 2024
1 parent 61d1945 commit 773b7b5
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ private FileIndexReader getFileIndexReader(
indexType,
FileIndexCommon.getFieldType(fields, columnName),
new Options())
.createReader(getBytesWithStartAndLength(startAndLength));
.createReader(
seekableInputStream,
startAndLength.getLeft(),
startAndLength.getRight());
}

private byte[] getBytesWithStartAndLength(Pair<Integer, Integer> startAndLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.fileindex;

import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;

Expand All @@ -31,7 +32,7 @@ public interface FileIndexer {

FileIndexWriter createWriter();

FileIndexReader createReader(byte[] serializedBytes);
FileIndexReader createReader(SeekableInputStream inputStream, int start, int length);

static FileIndexer create(String type, DataType dataType, Options options) {
FileIndexerFactory fileIndexerFactory = FileIndexerFactoryUtils.load(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.BloomFilter64.BitSet;
import org.apache.paimon.utils.IOUtils;

import org.apache.hadoop.util.bloom.HashFunction;

import java.io.IOException;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;

Expand Down Expand Up @@ -65,8 +69,15 @@ public FileIndexWriter createWriter() {
}

@Override
public FileIndexReader createReader(byte[] serializedBytes) {
return new Reader(dataType, serializedBytes);
public FileIndexReader createReader(SeekableInputStream inputStream, int start, int length) {
try {
inputStream.seek(start);
byte[] serializedBytes = new byte[length];
IOUtils.readFully(inputStream, serializedBytes);
return new Reader(dataType, serializedBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class Writer extends FileIndexWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fs.ByteArraySeekableStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataTypes;

Expand Down Expand Up @@ -60,7 +61,10 @@ public void testAddFindByRandom() {

testData.forEach(writer::write);

FileIndexReader reader = filter.createReader(writer.serializedBytes());
byte[] serializedBytes = writer.serializedBytes();
FileIndexReader reader =
filter.createReader(
new ByteArraySeekableStream(serializedBytes), 0, serializedBytes.length);

for (byte[] bytes : testData) {
Assertions.assertThat(reader.visitEqual(null, bytes).remain()).isTrue();
Expand Down Expand Up @@ -100,7 +104,10 @@ public void testAddFindByRandomLong() {

testData.forEach(writer::write);

FileIndexReader reader = filter.createReader(writer.serializedBytes());
byte[] serializedBytes = writer.serializedBytes();
FileIndexReader reader =
filter.createReader(
new ByteArraySeekableStream(serializedBytes), 0, serializedBytes.length);

for (Long value : testData) {
Assertions.assertThat(reader.visitEqual(null, value).remain()).isTrue();
Expand Down

0 comments on commit 773b7b5

Please sign in to comment.