Skip to content

Commit

Permalink
[parquet] Support using file index result to filter row ranges (#4780)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang authored and JingsongLi committed Dec 30, 2024
1 parent 005fb14 commit 049a5ae
Show file tree
Hide file tree
Showing 5 changed files with 732 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public long rangeCardinality(long start, long end) {
return roaringBitmap.rangeCardinality(start, end);
}

public int first() {
return roaringBitmap.first();
}

public int last() {
return roaringBitmap.last();
}
Expand Down Expand Up @@ -138,6 +142,10 @@ public static RoaringBitmap32 bitmapOf(int... dat) {
return roaringBitmap32;
}

public static RoaringBitmap32 bitmapOfRange(long min, long max) {
return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
}

public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) {
return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -70,6 +71,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -79,8 +81,11 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
Expand Down Expand Up @@ -722,6 +727,94 @@ public void testBSIAndBitmapIndexInDisk() throws Exception {
});
}

@Test
public void testBitmapIndexResultFilterParquetRowRanges() throws Exception {
RowType rowType =
RowType.builder()
.field("id", DataTypes.INT())
.field("event", DataTypes.STRING())
.field("price", DataTypes.INT())
.build();
// in unaware-bucket mode, we split files into splits all the time
FileStoreTable table =
createUnawareBucketFileStoreTable(
rowType,
options -> {
options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
options.set(WRITE_ONLY, true);
options.set(
FileIndexOptions.FILE_INDEX
+ "."
+ BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+ "."
+ CoreOptions.COLUMNS,
"price");
options.set(
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
});

int bound = 3000;
Random random = new Random();
Map<Integer, Integer> expectedMap = new HashMap<>();
for (int i = 0; i < 5; i++) {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
for (int j = 0; j < 10000; j++) {
int next = random.nextInt(bound);
expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1);
write.write(GenericRow.of(1, BinaryString.fromString("A"), next));
}
commit.commit(i, write.prepareCommit(true, i));
write.close();
commit.close();
}

// test eq
for (int i = 0; i < 10; i++) {
int key = random.nextInt(bound);
Predicate predicate = new PredicateBuilder(rowType).equal(2, key);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead().withFilter(predicate).createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
row -> {
cnt.incrementAndGet();
assertThat(row.getInt(2)).isEqualTo(key);
});
assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0));
reader.close();
}

// test between
for (int i = 0; i < 10; i++) {
int max = random.nextInt(bound);
int min = random.nextInt(max);
Predicate predicate =
PredicateBuilder.and(
new PredicateBuilder(rowType).greaterOrEqual(2, min),
new PredicateBuilder(rowType).lessOrEqual(2, max));
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead().withFilter(predicate).createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
row -> {
cnt.addAndGet(1);
assertThat(row.getInt(2)).isGreaterThanOrEqualTo(min);
assertThat(row.getInt(2)).isLessThanOrEqualTo(max);
});
Optional<Integer> reduce =
expectedMap.entrySet().stream()
.filter(x -> x.getKey() >= min && x.getKey() <= max)
.map(Map.Entry::getValue)
.reduce(Integer::sum);
assertThat(cnt.get()).isEqualTo(reduce.orElse(0));
reader.close();
}
}

@Test
public void testWithShardAppendTable() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ private RowRanges getRowRanges(int blockIndex) {
options.getRecordFilter(),
getColumnIndexStore(blockIndex),
paths.keySet(),
blocks.get(blockIndex).getRowCount());
blocks.get(blockIndex).getRowCount(),
fileIndexResult);
blockRowRanges.set(blockIndex, rowRanges);
}
return rowRanges;
Expand Down
Loading

0 comments on commit 049a5ae

Please sign in to comment.