Skip to content

Commit

Permalink
[core] Introduce bitmap index record reader (#4502)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang authored Nov 13, 2024
1 parent 787a981 commit 2b94a33
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex.bitmap;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.utils.RoaringBitmap32;

import javax.annotation.Nullable;

import java.io.IOException;

/**
* A {@link FileRecordIterator} wraps a {@link FileRecordIterator} and {@link BitmapIndexResult}.
*/
public class ApplyBitmapIndexFileRecordIterator implements FileRecordIterator<InternalRow> {

private final FileRecordIterator<InternalRow> iterator;
private final RoaringBitmap32 bitmap;
private final int last;

public ApplyBitmapIndexFileRecordIterator(
FileRecordIterator<InternalRow> iterator, BitmapIndexResult fileIndexResult) {
this.iterator = iterator;
this.bitmap = fileIndexResult.get();
this.last = bitmap.last();
}

@Override
public long returnedPosition() {
return iterator.returnedPosition();
}

@Override
public Path filePath() {
return iterator.filePath();
}

@Nullable
@Override
public InternalRow next() throws IOException {
while (true) {
InternalRow next = iterator.next();
if (next == null) {
return null;
}
int position = (int) returnedPosition();
if (position > last) {
return null;
}
if (bitmap.contains(position)) {
return next;
}
}
}

@Override
public void releaseBatch() {
iterator.releaseBatch();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex.bitmap;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link RecordReader} which apply {@link BitmapIndexResult} to filter record. */
public class ApplyBitmapIndexRecordReader implements RecordReader<InternalRow> {

private final RecordReader<InternalRow> reader;

private final BitmapIndexResult fileIndexResult;

public ApplyBitmapIndexRecordReader(
RecordReader<InternalRow> reader, BitmapIndexResult fileIndexResult) {
this.reader = reader;
this.fileIndexResult = fileIndexResult;
}

@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
RecordIterator<InternalRow> batch = reader.readBatch();
if (batch == null) {
return null;
}

checkArgument(
batch instanceof FileRecordIterator,
"There is a bug, RecordIterator in ApplyBitmapIndexRecordReader must be FileRecordIterator");

return new ApplyBitmapIndexFileRecordIterator(
(FileRecordIterator<InternalRow>) batch, fileIndexResult);
}

@Override
public void close() throws IOException {
reader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public long rangeCardinality(long start, long end) {
return roaringBitmap.rangeCardinality(start, end);
}

public int last() {
return roaringBitmap.last();
}

public RoaringBitmap32 clone() {
return new RoaringBitmap32(roaringBitmap.clone());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public RecordIterator<InternalRow> readBatch() throws IOException {

checkArgument(
batch instanceof FileRecordIterator,
"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");
"There is a bug, RecordIterator in ApplyDeletionVectorReader must be FileRecordIterator");

return new ApplyDeletionFileRecordIterator(
(FileRecordIterator<InternalRow>) batch, deletionVector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public static FileIndexResult evaluate(
DataFileMeta file)
throws IOException {
if (dataFilter != null && !dataFilter.isEmpty()) {
byte[] embeddedIndex = file.embeddedIndex();
if (embeddedIndex != null) {
try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndex, dataSchema.logicalRowType())) {
return predicate.evaluate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])));
}
}

List<String> indexFiles =
file.extraFiles().stream()
.filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.bitmap.ApplyBitmapIndexRecordReader;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
Expand Down Expand Up @@ -212,14 +214,20 @@ private RecordReader<InternalRow> createFileReader(
dataFilePathFactory.toPath(file.fileName()),
file.fileSize(),
fileIndexResult);
FileRecordReader fileRecordReader =
RecordReader<InternalRow> fileRecordReader =
new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));

if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
new ApplyBitmapIndexRecordReader(
fileRecordReader, (BitmapIndexResult) fileIndexResult);
}

DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get();
if (deletionVector != null && !deletionVector.isEmpty()) {
return new ApplyDeletionVectorReader(fileRecordReader, deletionVector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
Expand Down Expand Up @@ -574,6 +575,7 @@ public void testBSIAndBitmapIndexInMemory() throws Exception {
createUnawareBucketFileStoreTable(
rowType,
options -> {
options.set(METADATA_STATS_MODE, "NONE");
options.set(
FileIndexOptions.FILE_INDEX
+ "."
Expand All @@ -600,7 +602,11 @@ public void testBSIAndBitmapIndexInMemory() throws Exception {
write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
result.addAll(write.prepareCommit(true, 0));
write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
write.write(GenericRow.of(1, BinaryString.fromString("D"), 2L));
write.write(GenericRow.of(1, BinaryString.fromString("D"), 4L));
result.addAll(write.prepareCommit(true, 0));
commit.commit(0, result);
result.clear();
Expand Down Expand Up @@ -639,6 +645,7 @@ public void testBSIAndBitmapIndexInDisk() throws Exception {
createUnawareBucketFileStoreTable(
rowType,
options -> {
options.set(METADATA_STATS_MODE, "NONE");
options.set(
FileIndexOptions.FILE_INDEX
+ "."
Expand All @@ -665,7 +672,11 @@ public void testBSIAndBitmapIndexInDisk() throws Exception {
write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
result.addAll(write.prepareCommit(true, 0));
write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
write.write(GenericRow.of(1, BinaryString.fromString("D"), 2L));
write.write(GenericRow.of(1, BinaryString.fromString("D"), 4L));
result.addAll(write.prepareCommit(true, 0));
commit.commit(0, result);
result.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine;
Expand Down Expand Up @@ -862,6 +863,92 @@ public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
}

@Test
public void testDeletionVectorsWithBitmapFileIndexInFile() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofBytes(1));
conf.set("file-index.bitmap.columns", "b");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 100L));
write.write(rowData(1, 4, 100L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 1, 100L));
write.write(rowData(1, 2, 100L));
write.write(rowData(1, 3, 300L));
write.write(rowData(1, 5, 100L));
commit.commit(1, write.prepareCommit(true, 1));

write.write(rowData(1, 4, 200L));
commit.commit(2, write.prepareCommit(true, 2));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
TableRead read = table.newRead().withFilter(builder.equal(2, 100L));
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
"1|1|100|binary|varbinary|mapKey:mapVal|multiset",
"1|2|100|binary|varbinary|mapKey:mapVal|multiset",
"1|5|100|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithBitmapFileIndexInMeta() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofMebiBytes(1));
conf.set("file-index.bitmap.columns", "b");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 100L));
write.write(rowData(1, 4, 100L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 1, 100L));
write.write(rowData(1, 2, 100L));
write.write(rowData(1, 3, 300L));
write.write(rowData(1, 5, 100L));
commit.commit(1, write.prepareCommit(true, 1));

write.write(rowData(1, 4, 200L));
commit.commit(2, write.prepareCommit(true, 2));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
TableRead read = table.newRead().withFilter(builder.equal(2, 100L));
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
"1|1|100|binary|varbinary|mapKey:mapVal|multiset",
"1|2|100|binary|varbinary|mapKey:mapVal|multiset",
"1|5|100|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =
Expand Down

0 comments on commit 2b94a33

Please sign in to comment.