Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Optimization of Parquet Predicate Pushdown Capability #4608

Merged
merged 7 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,65 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception {
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithParquetFilter() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(FILE_FORMAT, "parquet");
conf.set("parquet.block.size", "1048576");
conf.set("parquet.page.size", "1024");
});

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

BatchTableWrite write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));

for (int i = 0; i < 2000; i++) {
write.write(rowData(i, i, i * 100L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want test datas in one bucket in deletion vector, just rowData(0, i, i*100L), the first column is "pt" for partition .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (int i = 0; i < 2000; i++) {
write.write(rowData(i, i, i * 100L));

The code above will write 2000 records in 2000 partitions.

}

List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));
for (int i = 1000; i < 2000; i++) {
write.write(rowDataWithKind(RowKind.DELETE, i, i, i * 100L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same rowDataWithKind(RowKind.DELETE, 0, i, i * 100L)

}

messages = write.prepareCommit();
commit = writeBuilder.newCommit();
commit.commit(messages);

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());

for (int i = 500; i < 510; i++) {
TableRead read = table.newRead().withFilter(builder.equal(0, i)).executeFilter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe builder.equal(1, i) is what you want. Partition predicate will not push down.

assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
String.format(
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
i, i, i * 100L)));
}

for (int i = 1500; i < 1510; i++) {
TableRead read = table.newRead().withFilter(builder.equal(0, i)).executeFilter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood the first column as the primary key... I'll make the correction.
我把第一列理解成了主键。。我改一改

assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
}
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetReadState;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -130,7 +131,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields);
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
Expand Down Expand Up @@ -336,6 +337,10 @@ private class ParquetReader implements FileRecordReader<InternalRow> {

private long nextRowPosition;

private ParquetReadState currentRowGroupReadState;

private long currentRowGroupFirstRowIndex;

/**
* For each request column, the reader to read this column. This is NULL if this column is
* missing from the file, in which case we populate the attribute with NULL.
Expand All @@ -359,6 +364,7 @@ private ParquetReader(
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
this.currentRowGroupFirstRowIndex = 0;
this.fields = fields;
}

Expand Down Expand Up @@ -390,7 +396,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
currentRowPosition = nextRowPosition;
}

int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
int num = getBachSize();

for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) {
batch.writableVectors[i].fillWithNulls();
Expand All @@ -400,13 +407,13 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}
}
rowsReturned += num;
nextRowPosition = currentRowPosition + num;
nextRowPosition = getNextRowPosition(num);
batch.columnarBatch.setNumRows(num);
return true;
}

private void readNextRowGroup() throws IOException {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
Expand All @@ -415,6 +422,9 @@ private void readNextRowGroup() throws IOException {
+ totalRowCount);
}

this.currentRowGroupReadState =
new ParquetReadState(rowGroup.getRowIndexes().orElse(null));

List<Type> types = requestedSchema.getFields();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
Expand All @@ -429,18 +439,62 @@ private void readNextRowGroup() throws IOException {
0);
}
}

totalCountLoadedSoFar += rowGroup.getRowCount();
if (rowGroup.getRowIndexOffset().isPresent()) {
currentRowPosition = rowGroup.getRowIndexOffset().get();

if (rowGroup.getRowIndexOffset().isPresent()) { // filter
currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get();
long pageIndex = 0;
if (!this.currentRowGroupReadState.isMaxRange()) {
pageIndex = this.currentRowGroupReadState.currentRangeStart();
}
currentRowPosition = currentRowGroupFirstRowIndex + pageIndex;
} else {
if (reader.rowGroupsFiltered()) {
throw new RuntimeException(
"There is a bug, rowIndexOffset must be present when row groups are filtered.");
}
currentRowGroupFirstRowIndex = nextRowPosition;
currentRowPosition = nextRowPosition;
}
}

private int getBachSize() throws IOException {

long rangeBatchSize = Long.MAX_VALUE;
if (this.currentRowGroupReadState.isFinished()) {
throw new IOException(
"expecting more rows but reached last page block. Read "
+ rowsReturned
+ " out of "
+ totalRowCount);
} else if (!this.currentRowGroupReadState.isMaxRange()) {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1;
}

return (int)
Math.min(
batchSize,
Math.min(rangeBatchSize, totalCountLoadedSoFar - rowsReturned));
}

private long getNextRowPosition(int num) {
if (this.currentRowGroupReadState.isMaxRange()) {
return this.currentRowPosition + num;
} else {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
long nextIndex = pageIndex + num;

if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) {
this.currentRowGroupReadState.nextRange();
nextIndex = this.currentRowGroupReadState.currentRangeStart();
}

return nextIndex;
}
}

private ParquetReaderBatch getCachedEntry() throws IOException {
try {
return pool.pollEntry();
Expand Down
Loading
Loading