diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index e06ed9ea26b6..6f74e67208de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -80,6 +80,7 @@ import java.util.function.Supplier; import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; +import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; @@ -255,8 +256,6 @@ private MergeTreeCompactRewriter createRewriter( @Nullable FieldsComparator userDefinedSeqComparator, Levels levels, @Nullable DeletionVectorsMaintainer dvMaintainer) { - KeyValueFileReaderFactory.Builder readerFactoryBuilder = - this.readerFactoryBuilder.copyWithoutProjection(); DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer); KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket, dvFactory); @@ -286,10 +285,11 @@ private MergeTreeCompactRewriter createRewriter( if (mergeEngine == FIRST_ROW) { if (options.deletionVectorsEnabled()) { throw new UnsupportedOperationException( - "Deletion vectors mode is not supported for first row merge engine now."); + "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); } lookupReaderFactory = readerFactoryBuilder + .copyWithoutProjection() .withValueProjection(new int[0][]) .build(partition, bucket, dvFactory); processor = new ContainsValueProcessor(); @@ -298,7 +298,9 @@ private MergeTreeCompactRewriter createRewriter( processor = lookupStrategy.deletionVector ? new PositionedKeyValueProcessor( - valueType, lookupStrategy.produceChangelog) + valueType, + lookupStrategy.produceChangelog + || mergeEngine != DEDUPLICATE) : new KeyValueProcessor(valueType); wrapperFactory = new LookupMergeFunctionWrapperFactory<>( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index cbdcb64e5a7a..cb52570cd2da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -463,10 +463,9 @@ private static void validateForDeletionVectors(TableSchema schema, CoreOptions o || options.changelogProducer() == ChangelogProducer.LOOKUP, "Deletion vectors mode is only supported for none or lookup changelog producer now."); - // todo: implement it checkArgument( !options.mergeEngine().equals(MergeEngine.FIRST_ROW), - "Deletion vectors mode is not supported for first row merge engine now."); + "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); } private static void validateSequenceField(TableSchema schema, CoreOptions options) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index 162e29eece60..1fe424b331e5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -48,7 +48,7 @@ public void testStreamingReadDVTable(String changelogProducer) throws Exception // test read from APPEND snapshot try (BlockingIterator iter = streamSqlBlockIter( - "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */"); ) { + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) { assertThat(iter.collect(12)) .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, 1, "111111111"), @@ -68,7 +68,7 @@ public void testStreamingReadDVTable(String changelogProducer) throws Exception // test read from COMPACT snapshot try (BlockingIterator iter = streamSqlBlockIter( - "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */"); ) { + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) { assertThat(iter.collect(8)) .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, 1, "111111111"), @@ -113,4 +113,89 @@ public void testBatchReadDVTable(String changelogProducer) { .containsExactlyInAnyOrder( Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, "3_1"), Row.of(4, "4")); } + + @ParameterizedTest + @ValueSource(strings = {"none", "lookup"}) + public void testDVTableWithAggregationMergeEngine(String changelogProducer) throws Exception { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v INT) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', " + + "'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", + changelogProducer)); + + sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)"); + + sql("INSERT INTO T VALUES (2, 1), (3, 1)"); + + sql("INSERT INTO T VALUES (2, 1), (4, 1)"); + + // test batch read + assertThat(batchSql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, 111111111), Row.of(2, 4), Row.of(3, 4), Row.of(4, 5)); + + // test streaming read + if (changelogProducer.equals("lookup")) { + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) { + assertThat(iter.collect(8)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, 111111111), + Row.ofKind(RowKind.INSERT, 2, 3), + Row.ofKind(RowKind.INSERT, 3, 4), + Row.ofKind(RowKind.INSERT, 4, 4), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3), + Row.ofKind(RowKind.UPDATE_AFTER, 2, 4), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, 4), + Row.ofKind(RowKind.UPDATE_AFTER, 4, 5)); + } + } + } + + @ParameterizedTest + @ValueSource(strings = {"none", "lookup"}) + public void testDVTableWithPartialUpdateMergeEngine(String changelogProducer) throws Exception { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 STRING, v2 STRING) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', " + + "'merge-engine'='partial-update')", + changelogProducer)); + + sql( + "INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')"); + + sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', '3_1')"); + + sql( + "INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, '4', CAST(NULL AS STRING))"); + + // test batch read + assertThat(batchSql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111", "1"), + Row.of(2, "2_1", "2"), + Row.of(3, "3_1", "3_1"), + Row.of(4, "4", "4")); + + // test streaming read + if (changelogProducer.equals("lookup")) { + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) { + assertThat(iter.collect(8)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111", "1"), + Row.ofKind(RowKind.INSERT, 2, "2", "2"), + Row.ofKind(RowKind.INSERT, 3, "3_1", "3_1"), + Row.ofKind(RowKind.INSERT, 4, null, "4"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2", "2"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1", "2"), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, null, "4"), + Row.ofKind(RowKind.UPDATE_AFTER, 4, "4", "4")); + } + } + } }