diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index d80bd6d3979d..956b615d7884 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -229,7 +229,8 @@ public void pushdown(Predicate keyFilter) { forWrite, options.scanManifestParallelism(), branchName, - options.deletionVectorsEnabled()); + options.deletionVectorsEnabled(), + options.mergeEngine()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index 0e115fdddc87..b0ab4338f117 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -213,7 +213,7 @@ public RecordReader createReader(DataSplit split) throws IOException { private RecordReader createReaderWithoutOuterProjection(DataSplit split) throws IOException { if (split.beforeFiles().isEmpty()) { - if (split.isStreaming() || split.deletionFiles().isPresent()) { + if (split.isStreaming() || split.convertToRawFiles().isPresent()) { return noMergeRead( split.partition(), split.bucket(), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 0f34cac5a138..b4c4909aed3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -36,6 +37,8 @@ import java.util.Collections; import java.util.List; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; + /** {@link FileStoreScan} for {@link KeyValueFileStore}. */ public class KeyValueFileStoreScan extends AbstractFileStoreScan { @@ -45,6 +48,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { private Predicate keyFilter; private Predicate valueFilter; private final boolean deletionVectorsEnabled; + private final MergeEngine mergeEngine; public KeyValueFileStoreScan( RowType partitionType, @@ -59,7 +63,8 @@ public KeyValueFileStoreScan( boolean checkNumOfBuckets, Integer scanManifestParallelism, String branchName, - boolean deletionVectorsEnabled) { + boolean deletionVectorsEnabled, + MergeEngine mergeEngine) { super( partitionType, bucketFilter, @@ -81,6 +86,7 @@ public KeyValueFileStoreScan( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schema.id()); this.deletionVectorsEnabled = deletionVectorsEnabled; + this.mergeEngine = mergeEngine; } public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { @@ -100,7 +106,9 @@ protected boolean filterByStats(ManifestEntry entry) { Predicate filter = null; FieldStatsArraySerializer serializer = null; BinaryTableStats stats = null; - if (deletionVectorsEnabled && entry.level() > 0 && valueFilter != null) { + if ((deletionVectorsEnabled || mergeEngine == FIRST_ROW) + && entry.level() > 0 + && valueFilter != null) { filter = valueFilter; serializer = fieldValueStatsConverters.getOrCreate(entry.file().schemaId()); stats = entry.file().valueStats(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index ac0f798a4b4a..f7215dc574fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -157,7 +157,10 @@ public SnapshotReader newSnapshotReader(String branchName) { @Override public InnerTableScan newScan() { return new InnerTableScanImpl( - coreOptions(), newSnapshotReader(), DefaultValueAssigner.create(tableSchema)); + tableSchema.primaryKeys().size() > 0, + coreOptions(), + newSnapshotReader(), + DefaultValueAssigner.create(tableSchema)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index f35afc64d5c9..fea783259241 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -116,11 +116,13 @@ public KeyValueFileStore store() { @Override protected SplitGenerator splitGenerator() { + CoreOptions options = store().options(); return new MergeTreeSplitGenerator( store().newKeyComparator(), - store().options().splitTargetSize(), - store().options().splitOpenFileCost(), - store().options().deletionVectorsEnabled()); + options.splitTargetSize(), + options.splitOpenFileCost(), + options.deletionVectorsEnabled(), + options.mergeEngine()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index 375ef7e0ac6c..b307279d0f31 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; + /** {@link TableScan} implementation for batch planning. */ public class InnerTableScanImpl extends AbstractInnerTableScan { @@ -39,13 +41,14 @@ public class InnerTableScanImpl extends AbstractInnerTableScan { private Integer pushDownLimit; public InnerTableScanImpl( + boolean pkTable, CoreOptions options, SnapshotReader snapshotReader, DefaultValueAssigner defaultValueAssigner) { super(options, snapshotReader); this.hasNext = true; this.defaultValueAssigner = defaultValueAssigner; - if (options.deletionVectorsEnabled()) { + if (pkTable && (options.deletionVectorsEnabled() || options.mergeEngine() == FIRST_ROW)) { snapshotReader.withLevelFilter(level -> level > 0); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java index 7cf1ed24d9c8..9a06a53f4ce6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.source; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.mergetree.SortedRun; @@ -31,6 +32,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; + /** Merge tree implementation of {@link SplitGenerator}. */ public class MergeTreeSplitGenerator implements SplitGenerator { @@ -42,20 +45,24 @@ public class MergeTreeSplitGenerator implements SplitGenerator { private final boolean deletionVectorsEnabled; + private final MergeEngine mergeEngine; + public MergeTreeSplitGenerator( Comparator keyComparator, long targetSplitSize, long openFileCost, - boolean deletionVectorsEnabled) { + boolean deletionVectorsEnabled, + MergeEngine mergeEngine) { this.keyComparator = keyComparator; this.targetSplitSize = targetSplitSize; this.openFileCost = openFileCost; this.deletionVectorsEnabled = deletionVectorsEnabled; + this.mergeEngine = mergeEngine; } @Override public List> splitForBatch(List files) { - if (deletionVectorsEnabled) { + if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) { Function weightFunc = file -> Math.max(file.fileSize(), openFileCost); return BinPacking.packForOrdered(files, weightFunc, targetSplitSize); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 06836d481aa5..aa28fa667467 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.Snapshot; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.RecordComparator; @@ -63,6 +64,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; @@ -73,6 +75,7 @@ public class SnapshotReaderImpl implements SnapshotReader { private final FileStoreScan scan; private final TableSchema tableSchema; private final CoreOptions options; + private final MergeEngine mergeEngine; private final boolean deletionVectors; private final SnapshotManager snapshotManager; private final ConsumerManager consumerManager; @@ -100,6 +103,7 @@ public SnapshotReaderImpl( this.scan = scan; this.tableSchema = tableSchema; this.options = options; + this.mergeEngine = options.mergeEngine(); this.deletionVectors = options.deletionVectorsEnabled(); this.snapshotManager = snapshotManager; this.consumerManager = @@ -435,7 +439,7 @@ private List convertToRawFiles( String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); // append only or deletionVectors files can be returned - if (tableSchema.primaryKeys().isEmpty() || deletionVectors) { + if (tableSchema.primaryKeys().isEmpty() || deletionVectors || mergeEngine == FIRST_ROW) { return makeRawTableFiles(bucketPath, dataFiles); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 35ac209a91d0..3c6910fbec16 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -105,6 +105,7 @@ public SnapshotReader newSnapshotReader(String branchName) { @Override public InnerTableScan newScan() { return new InnerTableScanImpl( + dataTable.schema().primaryKeys().size() > 0, coreOptions(), newSnapshotReader(), DefaultValueAssigner.create(dataTable.schema())); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 6d97eda5f69a..1278339210de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.io.DataFileTestUtils.fromMinMax; import static org.assertj.core.api.Assertions.assertThat; @@ -108,14 +109,14 @@ public void testMergeTree() { Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); assertThat( toNames( - new MergeTreeSplitGenerator(comparator, 100, 2, false) + new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE) .splitForBatch(files))) .containsExactlyInAnyOrder( Arrays.asList("1", "2", "4", "3", "5"), Collections.singletonList("6")); assertThat( toNames( - new MergeTreeSplitGenerator(comparator, 100, 30, false) + new MergeTreeSplitGenerator(comparator, 100, 30, false, DEDUPLICATE) .splitForBatch(files))) .containsExactlyInAnyOrder( Arrays.asList("1", "2", "4", "3"),