-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[parquet] Support using file index result to filter row ranges #4780
[parquet] Support using file index result to filter row ranges #4780
Conversation
Hi @Tan-JiaLiang , can you show some test results after index pushing down? |
I test in a 20 fields table, generate 1000000 rows, using the BSI index to test the EQ predicate.
|
This result was testing after #4765 fixed. |
It occurred to me that we could use the bitmap result to narrow down the row ranges, which might skip some deserialisation overhead. Be aware that this may cause fragmentation of the row ranges, but I think the row ranges themselves are limited, it doesn't take up much memory. |
@JingsongLi Can you help to trigger the build again? The Container was startup timeout in E2E test. |
Hi @Tan-JiaLiang thanks for your benchmark, what did you compare in the benchmark? Before and after this PR? Is that so, with or without an index? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Yes, I compare with the EQ predicate in enable or disable the option |
This is the Benchmark code /** Benchmark for table read. */
public class BitmapIndexPushDownBenchmark {
private static final int VALUE_COUNT = 20;
private final int rowCount = 1000000;
@TempDir java.nio.file.Path tempFile;
private final RandomDataGenerator random = new RandomDataGenerator();
@Test
public void testParquetRead() throws Exception {
System.out.println(tempFile);
int[] bounds = new int[] {1000, 10000, 50000, 100000};
for (int bound : bounds) {
Table table = prepareData(bound, parquet(), "parquet_" + bound);
Map<String, Table> tables = new LinkedHashMap<>();
tables.put("without-bsi-index", table.copy(Collections.singletonMap("file-index.read.enabled", "false")));
tables.put("with-bsi-index", table.copy(Collections.singletonMap("file-index.read.enabled", "true")));
int[] values = new int[5];
for (int i = 0; i < values.length; i++) {
values[i] = random.nextInt(0, bound);
}
innerTest(tables, bound, values);
}
}
private Options parquet() {
Options options = new Options();
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
options.set("file-index.bsi.columns", "k");
return options;
}
private void innerTest(Map<String, Table> tables, int bound, int[] values) {
int readTime = 3;
Benchmark benchmark =
new Benchmark("read", readTime * rowCount)
.setNumWarmupIters(1)
.setOutputPerIteration(true);
for (String name : tables.keySet()) {
for (int value : values) {
benchmark.addCase(
"read-" + name + "-" + bound + "-" + value,
3,
() -> {
Table table = tables.get(name);
Predicate predicate = new PredicateBuilder(table.rowType()).equal(0, value);
for (int i = 0; i < readTime; i++) {
List<Split> splits = table.newReadBuilder().newScan().plan().splits();
AtomicLong readCount = new AtomicLong(0);
try {
for (Split split : splits) {
RecordReader<InternalRow> reader =
table.newReadBuilder()
.withFilter(predicate)
.newRead()
.createReader(split);
reader.forEachRemaining(row -> readCount.incrementAndGet());
}
System.out.printf("Finish read %d rows.\n", readCount.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
}
benchmark.run();
}
private Table prepareData(int bound, Options options, String tableName) throws Exception {
Table table = createTable(options, tableName);
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
StreamTableCommit commit = writeBuilder.newCommit();
AtomicInteger writeCount = new AtomicInteger(0);
for (int i = 0; i < rowCount; i++) {
try {
write.write(newRandomRow(bound));
writeCount.incrementAndGet();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
List<CommitMessage> commitMessages = write.prepareCommit(true, 1);
commit.commit(1, commitMessages);
write.close();
return table;
}
protected Table createTable(Options tableOptions, String tableName)
throws Exception {
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.WAREHOUSE, tempFile.toUri().toString());
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
String database = "default";
catalog.createDatabase(database, true);
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "k", new IntType()));
for (int i = 1; i <= VALUE_COUNT; i++) {
fields.add(new DataField(i, "f" + i, DataTypes.STRING()));
}
Schema schema =
new Schema(fields, Collections.emptyList(), Collections.emptyList(), tableOptions.toMap(), "");
Identifier identifier = Identifier.create(database, tableName);
catalog.createTable(identifier, schema, false);
return catalog.getTable(identifier);
}
protected InternalRow newRandomRow(int bound) {
GenericRow row = new GenericRow(1 + VALUE_COUNT);
row.setField(0, random.nextInt(0, bound));
for (int i = 1; i <= VALUE_COUNT; i++) {
row.setField(i, BinaryString.fromString(random.nextHexString(32)));
}
return row;
}
} |
Purpose
Support using file index result to filter row ranges.
Tests
org.apache.paimon.table.AppendOnlyFileStoreTableTest#testBitmapIndexResultFilterParquetRowRanges
API and Format
No.
Documentation
No.