Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[common][core][format] Support using file index result for RowGroup filtering. #4473

Merged
merged 8 commits into from
Nov 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,20 @@ public FileIndexPredicate(SeekableInputStream inputStream, RowType fileRowType)
this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
}

public boolean testPredicate(@Nullable Predicate filePredicate) {
if (filePredicate == null) {
return true;
public FileIndexResult evaluate(@Nullable Predicate predicate) {
if (predicate == null) {
return REMAIN;
}

Set<String> requiredFieldNames = getRequiredNames(filePredicate);

Set<String> requiredFieldNames = getRequiredNames(predicate);
Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
requiredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
FileIndexResult result = new FileIndexPredicateTest(indexReaders).test(predicate);
if (!result.remain()) {
LOG.debug(
"One file has been filtered: "
+ (path == null ? "in scan stage" : path.toString()));
return false;
}
return true;
return result;
}

private Set<String> getRequiredNames(Predicate filePredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,31 @@

package org.apache.paimon.format;

import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

/** the context for creating RecordReader {@link RecordReader}. */
public class FormatReaderContext implements FormatReaderFactory.Context {

private final FileIO fileIO;
private final Path file;
private final long fileSize;
@Nullable private final FileIndexResult fileIndexResult;

public FormatReaderContext(FileIO fileIO, Path file, long fileSize) {
this(fileIO, file, fileSize, null);
}

public FormatReaderContext(
FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult fileIndexResult) {
this.fileIO = fileIO;
this.file = file;
this.fileSize = fileSize;
this.fileIndexResult = fileIndexResult;
}

@Override
Expand All @@ -49,4 +59,9 @@ public Path filePath() {
public long fileSize() {
return fileSize;
}

@Override
public FileIndexResult fileIndex() {
return fileIndexResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
Expand All @@ -38,5 +39,7 @@ interface Context {
Path filePath();

long fileSize();

FileIndexResult fileIndex();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public long getCardinality() {
return roaringBitmap.getLongCardinality();
}

public long rangeCardinality(long start, long end) {
return roaringBitmap.rangeCardinality(start, end);
}

public RoaringBitmap32 clone() {
return new RoaringBitmap32(roaringBitmap.clone());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.io;

import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand All @@ -28,10 +29,10 @@
import java.util.List;
import java.util.stream.Collectors;

/** File index reader, do the filter in the constructor. */
public class FileIndexSkipper {
/** Evaluate file index result. */
public class FileIndexEvaluator {

public static boolean skip(
public static FileIndexResult evaluate(
FileIO fileIO,
TableSchema dataSchema,
List<Predicate> dataFilter,
Expand All @@ -55,14 +56,11 @@ public static boolean skip(
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) {
return true;
}
return predicate.evaluate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])));
}
}
}

return false;
return FileIndexResult.REMAIN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
return predicate.testPredicate(dataPredicate);
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking predicate.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
id ->
fieldValueStatsConverters.convertFilter(
entry.file().schemaId(), valueFilter));
return predicate.testPredicate(dataPredicate);
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking fileIndex predicate.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.FileIndexSkipper;
import org.apache.paimon.io.FileIndexEvaluator;
import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
Expand Down Expand Up @@ -191,26 +192,30 @@ private RecordReader<InternalRow> createFileReader(
BulkFormatMapping bulkFormatMapping,
IOExceptionSupplier<DeletionVector> dvFactory)
throws IOException {
FileIndexResult fileIndexResult = null;
if (fileIndexReadEnabled) {
boolean skip =
FileIndexSkipper.skip(
hang8929201 marked this conversation as resolved.
Show resolved Hide resolved
fileIndexResult =
FileIndexEvaluator.evaluate(
fileIO,
bulkFormatMapping.getDataSchema(),
bulkFormatMapping.getDataFilters(),
dataFilePathFactory,
file);
if (skip) {
if (!fileIndexResult.remain()) {
return new EmptyRecordReader<>();
}
}

FormatReaderContext formatReaderContext =
new FormatReaderContext(
fileIO,
dataFilePathFactory.toPath(file.fileName()),
file.fileSize(),
fileIndexResult);
FileRecordReader fileRecordReader =
new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
new FormatReaderContext(
fileIO,
dataFilePathFactory.toPath(file.fileName()),
file.fileSize()),
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
Expand Down Expand Up @@ -80,6 +81,11 @@ public Path filePath() {
public long fileSize() {
return length;
}

@Override
public FileIndexResult fileIndex() {
return context.fileIndex();
}
});
}

Expand Down
Loading
Loading