Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Optimization of Parquet Predicate Pushdown Capability #4608

Merged
merged 7 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -810,6 +811,68 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception {
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithParquetFilter() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(FILE_FORMAT, "parquet");
conf.set("parquet.block.size", "1048576");
conf.set("parquet.page.size", "1024");
});

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

BatchTableWrite write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));

for (int i = 0; i < 200000; i++) {
write.write(rowData(1, i, i * 100L));
}

List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));
for (int i = 180000; i < 200000; i++) {
write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L));
}

messages = write.prepareCommit();
commit = writeBuilder.newCommit();
commit.commit(messages);

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
Random random = new Random();

for (int i = 0; i < 10; i++) {
int value = random.nextInt(180000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
String.format(
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
1, value, value * 100L)));
}

for (int i = 0; i < 10; i++) {
int value = 180000 + random.nextInt(20000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
}
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetReadState;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -130,7 +131,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields);
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
Expand Down Expand Up @@ -336,6 +337,10 @@ private class ParquetReader implements FileRecordReader<InternalRow> {

private long nextRowPosition;

private ParquetReadState currentRowGroupReadState;

private long currentRowGroupFirstRowIndex;

/**
* For each request column, the reader to read this column. This is NULL if this column is
* missing from the file, in which case we populate the attribute with NULL.
Expand All @@ -359,6 +364,7 @@ private ParquetReader(
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
this.currentRowGroupFirstRowIndex = 0;
this.fields = fields;
}

Expand Down Expand Up @@ -390,7 +396,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
currentRowPosition = nextRowPosition;
}

int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
int num = getBachSize();

for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) {
batch.writableVectors[i].fillWithNulls();
Expand All @@ -400,13 +407,13 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}
}
rowsReturned += num;
nextRowPosition = currentRowPosition + num;
nextRowPosition = getNextRowPosition(num);
batch.columnarBatch.setNumRows(num);
return true;
}

private void readNextRowGroup() throws IOException {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
Expand All @@ -415,6 +422,9 @@ private void readNextRowGroup() throws IOException {
+ totalRowCount);
}

this.currentRowGroupReadState =
new ParquetReadState(rowGroup.getRowIndexes().orElse(null));

List<Type> types = requestedSchema.getFields();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
Expand All @@ -429,18 +439,62 @@ private void readNextRowGroup() throws IOException {
0);
}
}

totalCountLoadedSoFar += rowGroup.getRowCount();
if (rowGroup.getRowIndexOffset().isPresent()) {
currentRowPosition = rowGroup.getRowIndexOffset().get();

if (rowGroup.getRowIndexOffset().isPresent()) { // filter
currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get();
long pageIndex = 0;
if (!this.currentRowGroupReadState.isMaxRange()) {
pageIndex = this.currentRowGroupReadState.currentRangeStart();
}
currentRowPosition = currentRowGroupFirstRowIndex + pageIndex;
} else {
if (reader.rowGroupsFiltered()) {
throw new RuntimeException(
"There is a bug, rowIndexOffset must be present when row groups are filtered.");
}
currentRowGroupFirstRowIndex = nextRowPosition;
currentRowPosition = nextRowPosition;
}
}

private int getBachSize() throws IOException {

long rangeBatchSize = Long.MAX_VALUE;
if (this.currentRowGroupReadState.isFinished()) {
throw new IOException(
"expecting more rows but reached last page block. Read "
+ rowsReturned
+ " out of "
+ totalRowCount);
} else if (!this.currentRowGroupReadState.isMaxRange()) {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1;
}

return (int)
Math.min(
batchSize,
Math.min(rangeBatchSize, totalCountLoadedSoFar - rowsReturned));
}

private long getNextRowPosition(int num) {
if (this.currentRowGroupReadState.isMaxRange()) {
return this.currentRowPosition + num;
} else {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
long nextIndex = pageIndex + num;

if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) {
this.currentRowGroupReadState.nextRange();
nextIndex = this.currentRowGroupReadState.currentRangeStart();
}

return nextIndex;
}
}

private ParquetReaderBatch getCachedEntry() throws IOException {
try {
return pool.pollEntry();
Expand Down
Loading
Loading