Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Enable file index for DV table #4310

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ private KeyValueFileStoreScan newScan(boolean forWrite) {
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
options.mergeEngine(),
options.changelogProducer());
options.changelogProducer(),
options.fileIndexReadEnabled() && options.deletionVectorsEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add "File index support Append Only tables and pk table with DV" in fileIndex doc

}

@Override
Expand Down
37 changes: 37 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,43 @@ public static DataFileMeta forAppend(
valueStatsCols);
}

public DataFileMeta(
String fileName,
long fileSize,
long rowCount,
BinaryRow minKey,
BinaryRow maxKey,
SimpleStats keyStats,
SimpleStats valueStats,
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
int level,
List<String> extraFiles,
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
this(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
}

public DataFileMeta(
String fileName,
long fileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
Expand All @@ -42,9 +43,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;

/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing {@link KeyValue}s. Also
* produces {@link DataFileMeta} after writing a file.
Expand All @@ -66,6 +70,7 @@ public class KeyValueDataFileWriter
private final SimpleStatsConverter valueStatsConverter;
private final InternalRowSerializer keySerializer;
private final FileSource fileSource;
@Nullable private final DataFileIndexWriter dataFileIndexWriter;

private BinaryRow minKey = null;
private InternalRow maxKey = null;
Expand All @@ -85,7 +90,8 @@ public KeyValueDataFileWriter(
int level,
String compression,
CoreOptions options,
FileSource fileSource) {
FileSource fileSource,
FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
Expand All @@ -107,12 +113,19 @@ public KeyValueDataFileWriter(
this.valueStatsConverter = new SimpleStatsConverter(valueType, options.statsDenseStore());
this.keySerializer = new InternalRowSerializer(keyType);
this.fileSource = fileSource;
this.dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions);
}

@Override
public void write(KeyValue kv) throws IOException {
super.write(kv);

if (dataFileIndexWriter != null) {
dataFileIndexWriter.write(kv.value());
}

updateMinKey(kv);
updateMaxKey(kv);

Expand Down Expand Up @@ -165,6 +178,11 @@ public DataFileMeta result() throws IOException {
Pair<List<String>, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);

DataFileIndexWriter.FileIndexResult indexResult =
dataFileIndexWriter == null
? DataFileIndexWriter.EMPTY_RESULT
: dataFileIndexWriter.result();

return new DataFileMeta(
path.getName(),
fileIO.getFileSize(path),
Expand All @@ -177,10 +195,20 @@ public DataFileMeta result() throws IOException {
maxSeqNumber,
schemaId,
level,
indexResult.independentIndexFile() == null
? Collections.emptyList()
: Collections.singletonList(indexResult.independentIndexFile()),
deleteRecordCount,
// TODO: enable file filter for primary key table (e.g. deletion table).
null,
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
}

@Override
public void close() throws IOException {
if (dataFileIndexWriter != null) {
dataFileIndexWriter.close();
}
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class KeyValueFileWriterFactory {
private final WriteFormatContext formatContext;
private final long suggestedFileSize;
private final CoreOptions options;
private final FileIndexOptions fileIndexOptions;

private KeyValueFileWriterFactory(
FileIO fileIO,
Expand All @@ -68,6 +70,7 @@ private KeyValueFileWriterFactory(
this.formatContext = formatContext;
this.suggestedFileSize = suggestedFileSize;
this.options = options;
this.fileIndexOptions = options.indexColumnsOptions();
}

public RowType keyType() {
Expand Down Expand Up @@ -117,7 +120,8 @@ private KeyValueDataFileWriter createDataFileWriter(
level,
formatContext.compression(level),
options,
fileSource);
fileSource,
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
Expand All @@ -31,11 +32,17 @@
import org.apache.paimon.stats.SimpleStatsEvolution;
import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
Expand All @@ -54,6 +61,10 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
private final MergeEngine mergeEngine;
private final ChangelogProducer changelogProducer;

private final boolean fileIndexReadEnabled;
// just cache.
private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();

public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
Expand All @@ -65,7 +76,8 @@ public KeyValueFileStoreScan(
Integer scanManifestParallelism,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine,
ChangelogProducer changelogProducer) {
ChangelogProducer changelogProducer,
boolean fileIndexReadEnabled) {
super(
manifestsReader,
snapshotManager,
Expand All @@ -85,6 +97,7 @@ public KeyValueFileStoreScan(
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.mergeEngine = mergeEngine;
this.changelogProducer = changelogProducer;
this.fileIndexReadEnabled = fileIndexReadEnabled;
}

public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
Expand Down Expand Up @@ -118,6 +131,28 @@ protected boolean filterByStats(ManifestEntry entry) {
return true;
}

private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fileIndex does not exist in embeddedIndex but in a separate file, what logic of filter by FileIndex in pk table with DV?

}

RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType();

Predicate dataPredicate =
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.convertFilter(
entry.file().schemaId(), valueFilter));

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
return predicate.testPredicate(dataPredicate);
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking predicate.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception happens while checking fileIndex predicate.

}
}

private boolean isValueFilterEnabled(ManifestEntry entry) {
if (valueFilter == null) {
return false;
Expand Down Expand Up @@ -181,7 +216,12 @@ private boolean filterByValueFilter(ManifestEntry entry) {
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols());
return valueFilter.test(
file.rowCount(), result.minValues(), result.maxValues(), result.nullCounts());
file.rowCount(),
result.minValues(),
result.maxValues(),
result.nullCounts())
&& (!fileIndexReadEnabled
|| filterByFileIndex(entry.file().embeddedIndex(), entry));
}

private static boolean noOverlapping(List<ManifestEntry> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,6 @@ private void writeBranchData(FileStoreTable table) throws Exception {
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
if (table.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) {
// TODO support parquet reader filter push down
return;
}

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
Expand Down Expand Up @@ -791,6 +787,81 @@ public void testWithShardDeletionVectors() throws Exception {
innerTestWithShard(table);
}

@Test
public void testDeletionVectorsWithFileIndexInFile() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set("file-index.bloom-filter.columns", "b");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 200L));
write.write(rowData(1, 4, 500L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 5, 100L));
write.write(rowData(1, 6, 600L));
write.write(rowData(1, 7, 400L));
commit.commit(1, write.prepareCommit(true, 1));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
"1|1|300|binary|varbinary|mapKey:mapVal|multiset",
"1|2|400|binary|varbinary|mapKey:mapVal|multiset",
"1|3|200|binary|varbinary|mapKey:mapVal|multiset",
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set("file-index.bloom-filter.columns", "b");
conf.set("file-index.bloom-filter.b.items", "20");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 200L));
write.write(rowData(1, 4, 500L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 5, 100L));
write.write(rowData(1, 6, 600L));
write.write(rowData(1, 7, 400L));
commit.commit(1, write.prepareCommit(true, 1));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
Predicate predicate = builder.equal(2, 300L);

List<Split> splits =
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());

assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
}

@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =
Expand Down
Loading