Skip to content

Commit

Permalink
[parquet] Merge the file index and the deletion vector and push down …
Browse files Browse the repository at this point in the history
…filtering (#4812)
  • Loading branch information
Tan-JiaLiang authored Jan 6, 2025
1 parent 5cf2338 commit 3dab1b8
Show file tree
Hide file tree
Showing 15 changed files with 213 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.paimon.format;

import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.RoaringBitmap32;

import javax.annotation.Nullable;

Expand All @@ -31,18 +31,18 @@ public class FormatReaderContext implements FormatReaderFactory.Context {
private final FileIO fileIO;
private final Path file;
private final long fileSize;
@Nullable private final FileIndexResult fileIndexResult;
@Nullable private final RoaringBitmap32 selection;

public FormatReaderContext(FileIO fileIO, Path file, long fileSize) {
this(fileIO, file, fileSize, null);
}

public FormatReaderContext(
FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult fileIndexResult) {
FileIO fileIO, Path file, long fileSize, @Nullable RoaringBitmap32 selection) {
this.fileIO = fileIO;
this.file = file;
this.fileSize = fileSize;
this.fileIndexResult = fileIndexResult;
this.selection = selection;
}

@Override
Expand All @@ -62,7 +62,7 @@ public long fileSize() {

@Nullable
@Override
public FileIndexResult fileIndex() {
return fileIndexResult;
public RoaringBitmap32 selection() {
return selection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.format;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.RoaringBitmap32;

import javax.annotation.Nullable;

Expand All @@ -44,6 +44,6 @@ interface Context {
long fileSize();

@Nullable
FileIndexResult fileIndex();
RoaringBitmap32 selection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ public long getCardinality() {
return roaringBitmap.getLongCardinality();
}

public long rangeCardinality(long start, long end) {
return roaringBitmap.rangeCardinality(start, end);
}

public int first() {
return roaringBitmap.first();
}
Expand All @@ -80,6 +76,18 @@ public int last() {
return roaringBitmap.last();
}

public long nextValue(int fromValue) {
return roaringBitmap.nextValue(fromValue);
}

public long previousValue(int fromValue) {
return roaringBitmap.previousValue(fromValue);
}

public boolean intersects(long minimum, long supremum) {
return roaringBitmap.intersects(minimum, supremum);
}

public RoaringBitmap32 clone() {
return new RoaringBitmap32(roaringBitmap.clone());
}
Expand Down Expand Up @@ -142,10 +150,6 @@ public static RoaringBitmap32 bitmapOf(int... dat) {
return roaringBitmap32;
}

public static RoaringBitmap32 bitmapOfRange(long min, long max) {
return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
}

public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) {
return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ public byte[] serializeToBytes() {
}
}

/**
* Note: the result is read only, do not call any modify operation outside.
*
* @return the deleted position
*/
public RoaringBitmap32 get() {
return roaringBitmap;
}

public static DeletionVector deserializeFromByteBuffer(ByteBuffer buffer) throws IOException {
RoaringBitmap32 bitmap = new RoaringBitmap32();
bitmap.deserialize(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.BitmapDeletionVector;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fileindex.FileIndexResult;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
import org.apache.paimon.utils.IOExceptionSupplier;
import org.apache.paimon.utils.RoaringBitmap32;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -208,9 +210,29 @@ private FileRecordReader<InternalRow> createFileReader(
}
}

RoaringBitmap32 selection = null;
if (fileIndexResult instanceof BitmapIndexResult) {
selection = ((BitmapIndexResult) fileIndexResult).get();
}

RoaringBitmap32 deletion = null;
DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get();
if (deletionVector instanceof BitmapDeletionVector) {
deletion = ((BitmapDeletionVector) deletionVector).get();
}

if (selection != null) {
if (deletion != null) {
selection = RoaringBitmap32.andNot(selection, deletion);
}
if (selection.isEmpty()) {
return new EmptyFileRecordReader<>();
}
}

FormatReaderContext formatReaderContext =
new FormatReaderContext(
fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult);
fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
Expand All @@ -225,7 +247,6 @@ private FileRecordReader<InternalRow> createFileReader(
fileRecordReader, (BitmapIndexResult) fileIndexResult);
}

DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get();
if (deletionVector != null && !deletionVector.isEmpty()) {
return new ApplyDeletionVectorReader(fileRecordReader, deletionVector);
}
Expand Down
Loading

0 comments on commit 3dab1b8

Please sign in to comment.