Skip to content

Commit

Permalink
Fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Oct 21, 2024
1 parent 87321e1 commit 5463266
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ private KeyValueFileStoreScan newScan(boolean forWrite) {
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
options.mergeEngine(),
options.changelogProducer());
options.changelogProducer(),
options.fileIndexReadEnabled() && options.deletionVectorsEnabled());
}

@Override
Expand Down
37 changes: 37 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,43 @@ public static DataFileMeta forAppend(
valueStatsCols);
}

public DataFileMeta(
String fileName,
long fileSize,
long rowCount,
BinaryRow minKey,
BinaryRow maxKey,
SimpleStats keyStats,
SimpleStats valueStats,
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
int level,
List<String> extraFiles,
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
this(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
}

public DataFileMeta(
String fileName,
long fileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
Expand All @@ -42,9 +43,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;

/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing {@link KeyValue}s. Also
* produces {@link DataFileMeta} after writing a file.
Expand All @@ -66,6 +70,7 @@ public class KeyValueDataFileWriter
private final SimpleStatsConverter valueStatsConverter;
private final InternalRowSerializer keySerializer;
private final FileSource fileSource;
@Nullable private final DataFileIndexWriter dataFileIndexWriter;

private BinaryRow minKey = null;
private InternalRow maxKey = null;
Expand All @@ -85,7 +90,8 @@ public KeyValueDataFileWriter(
int level,
String compression,
CoreOptions options,
FileSource fileSource) {
FileSource fileSource,
FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
Expand All @@ -107,12 +113,19 @@ public KeyValueDataFileWriter(
this.valueStatsConverter = new SimpleStatsConverter(valueType, options.statsDenseStore());
this.keySerializer = new InternalRowSerializer(keyType);
this.fileSource = fileSource;
this.dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions);
}

@Override
public void write(KeyValue kv) throws IOException {
super.write(kv);

if (dataFileIndexWriter != null) {
dataFileIndexWriter.write(kv.value());
}

updateMinKey(kv);
updateMaxKey(kv);

Expand Down Expand Up @@ -165,6 +178,11 @@ public DataFileMeta result() throws IOException {
Pair<List<String>, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);

DataFileIndexWriter.FileIndexResult indexResult =
dataFileIndexWriter == null
? DataFileIndexWriter.EMPTY_RESULT
: dataFileIndexWriter.result();

return new DataFileMeta(
path.getName(),
fileIO.getFileSize(path),
Expand All @@ -177,10 +195,20 @@ public DataFileMeta result() throws IOException {
maxSeqNumber,
schemaId,
level,
indexResult.independentIndexFile() == null
? Collections.emptyList()
: Collections.singletonList(indexResult.independentIndexFile()),
deleteRecordCount,
// TODO: enable file filter for primary key table (e.g. deletion table).
null,
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
}

@Override
public void close() throws IOException {
if (dataFileIndexWriter != null) {
dataFileIndexWriter.close();
}
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class KeyValueFileWriterFactory {
private final WriteFormatContext formatContext;
private final long suggestedFileSize;
private final CoreOptions options;
private final FileIndexOptions fileIndexOptions;

private KeyValueFileWriterFactory(
FileIO fileIO,
Expand All @@ -68,6 +70,7 @@ private KeyValueFileWriterFactory(
this.formatContext = formatContext;
this.suggestedFileSize = suggestedFileSize;
this.options = options;
this.fileIndexOptions = options.indexColumnsOptions();
}

public RowType keyType() {
Expand Down Expand Up @@ -117,7 +120,8 @@ private KeyValueDataFileWriter createDataFileWriter(
level,
formatContext.compression(level),
options,
fileSource);
fileSource,
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
Expand All @@ -31,11 +32,17 @@
import org.apache.paimon.stats.SimpleStatsEvolution;
import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
Expand All @@ -54,6 +61,10 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
private final MergeEngine mergeEngine;
private final ChangelogProducer changelogProducer;

private final boolean fileIndexReadEnabled;
// just cache.
private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();

public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
Expand All @@ -65,7 +76,8 @@ public KeyValueFileStoreScan(
Integer scanManifestParallelism,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine,
ChangelogProducer changelogProducer) {
ChangelogProducer changelogProducer,
boolean fileIndexReadEnabled) {
super(
manifestsReader,
snapshotManager,
Expand All @@ -85,6 +97,7 @@ public KeyValueFileStoreScan(
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.mergeEngine = mergeEngine;
this.changelogProducer = changelogProducer;
this.fileIndexReadEnabled = fileIndexReadEnabled;
}

public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
Expand Down Expand Up @@ -118,6 +131,28 @@ protected boolean filterByStats(ManifestEntry entry) {
return true;
}

private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
}

RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType();

Predicate dataPredicate =
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.convertFilter(
entry.file().schemaId(), valueFilter));

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
return predicate.testPredicate(dataPredicate);
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking predicate.", e);
}
}

private boolean isValueFilterEnabled(ManifestEntry entry) {
if (valueFilter == null) {
return false;
Expand Down Expand Up @@ -181,7 +216,12 @@ private boolean filterByValueFilter(ManifestEntry entry) {
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols());
return valueFilter.test(
file.rowCount(), result.minValues(), result.maxValues(), result.nullCounts());
file.rowCount(),
result.minValues(),
result.maxValues(),
result.nullCounts())
&& (!fileIndexReadEnabled
|| filterByFileIndex(entry.file().embeddedIndex(), entry));
}

private static boolean noOverlapping(List<ManifestEntry> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,6 @@ private void writeBranchData(FileStoreTable table) throws Exception {
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
if (table.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) {
// TODO support parquet reader filter push down
return;
}

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
Expand Down Expand Up @@ -791,6 +787,81 @@ public void testWithShardDeletionVectors() throws Exception {
innerTestWithShard(table);
}

@Test
public void testDeletionVectorsWithFileIndexInFile() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set("file-index.bloom-filter.columns", "b");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 200L));
write.write(rowData(1, 4, 500L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 5, 100L));
write.write(rowData(1, 6, 600L));
write.write(rowData(1, 7, 400L));
commit.commit(1, write.prepareCommit(true, 1));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
"1|1|300|binary|varbinary|mapKey:mapVal|multiset",
"1|2|400|binary|varbinary|mapKey:mapVal|multiset",
"1|3|200|binary|varbinary|mapKey:mapVal|multiset",
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set("file-index.bloom-filter.columns", "b");
conf.set("file-index.bloom-filter.b.items", "20");
});

StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 1, 300L));
write.write(rowData(1, 2, 400L));
write.write(rowData(1, 3, 200L));
write.write(rowData(1, 4, 500L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(1, 5, 100L));
write.write(rowData(1, 6, 600L));
write.write(rowData(1, 7, 400L));
commit.commit(1, write.prepareCommit(true, 1));

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
Predicate predicate = builder.equal(2, 300L);

List<Split> splits =
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());

assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
}

@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =
Expand Down

0 comments on commit 5463266

Please sign in to comment.