From ff533832844d93bb4361c8d7e30d277eec749110 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 13 Mar 2024 17:14:25 +0800 Subject: [PATCH 1/2] first-version --- .../paimon/operation/FileStoreCommitImpl.java | 63 +++++++++----- .../source/InnerStreamTableScanImpl.java | 33 ++++++-- .../paimon/flink/DeletionVectorITCase.java | 84 +++++++++++++++++++ 3 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index a64b8239b25c..46cfceb145c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -72,6 +72,8 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; +import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** @@ -218,20 +220,22 @@ public void commit(ManifestCommittable committable, Map properti List appendChangelog = new ArrayList<>(); List compactTableFiles = new ArrayList<>(); List compactChangelog = new ArrayList<>(); - List appendIndexFiles = new ArrayList<>(); + List appendHashIndexFiles = new ArrayList<>(); + List appendDvIndexFiles = new ArrayList<>(); collectChanges( committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog, - appendIndexFiles); + appendHashIndexFiles, + appendDvIndexFiles); try { List appendSimpleEntries = SimpleFileEntry.from(appendTableFiles); if (!ignoreEmptyCommit || !appendTableFiles.isEmpty() || !appendChangelog.isEmpty() - || !appendIndexFiles.isEmpty()) { + || !appendHashIndexFiles.isEmpty()) { // Optimization for common path. // Step 1: // Read manifest entries from changed partitions here and check for conflicts. @@ -255,7 +259,7 @@ public void commit(ManifestCommittable committable, Map properti tryCommit( appendTableFiles, appendChangelog, - appendIndexFiles, + appendHashIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -266,7 +270,9 @@ public void commit(ManifestCommittable committable, Map properti generatedSnapshot += 1; } - if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) { + if (!compactTableFiles.isEmpty() + || !compactChangelog.isEmpty() + || !appendDvIndexFiles.isEmpty()) { // Optimization for common path. // Step 2: // Add appendChanges to the manifest entries read above and check for conflicts. @@ -288,7 +294,7 @@ public void commit(ManifestCommittable committable, Map properti tryCommit( compactTableFiles, compactChangelog, - Collections.emptyList(), + appendDvIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -353,14 +359,16 @@ public void overwrite( List appendChangelog = new ArrayList<>(); List compactTableFiles = new ArrayList<>(); List compactChangelog = new ArrayList<>(); - List appendIndexFiles = new ArrayList<>(); + List appendHashIndexFiles = new ArrayList<>(); + List appendDvIndexFiles = new ArrayList<>(); collectChanges( committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog, - appendIndexFiles); + appendHashIndexFiles, + appendDvIndexFiles); if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) { StringBuilder warnMessage = @@ -422,19 +430,19 @@ public void overwrite( tryOverwrite( partitionFilter, appendTableFiles, - appendIndexFiles, + appendHashIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets()); generatedSnapshot += 1; } - if (!compactTableFiles.isEmpty()) { + if (!compactTableFiles.isEmpty() || !appendDvIndexFiles.isEmpty()) { attempts += tryCommit( compactTableFiles, Collections.emptyList(), - Collections.emptyList(), + appendDvIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -556,7 +564,8 @@ private void collectChanges( List appendChangelog, List compactTableFiles, List compactChangelog, - List appendIndexFiles) { + List appendHashIndexFiles, + List appendDvIndexFiles) { for (CommitMessage message : commitMessages) { CommitMessageImpl commitMessage = (CommitMessageImpl) message; commitMessage @@ -586,13 +595,29 @@ private void collectChanges( .indexIncrement() .newIndexFiles() .forEach( - f -> - appendIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - f))); + f -> { + switch (f.indexType()) { + case HASH_INDEX: + appendHashIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + f)); + break; + case DELETION_VECTORS_INDEX: + appendDvIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + f)); + break; + default: + throw new RuntimeException( + "Unknown index type: " + f.indexType()); + } + }); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java index 8fa5b98dbeec..5fc8866fd6c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.Consumer; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner; @@ -35,6 +36,7 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -42,6 +44,8 @@ import javax.annotation.Nullable; +import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; + /** {@link StreamTableScan} implementation for streaming planning. */ public class InnerStreamTableScanImpl extends AbstractInnerTableScan implements InnerStreamTableScan { @@ -116,12 +120,31 @@ private void initScanner() { } private Plan tryFirstPlan() { - StartingScanner.Result result = startingScanner.scan(snapshotReader); + StartingScanner.Result result; + if (options.needLookup()) { + result = startingScanner.scan(snapshotReader.withLevelFilter(level -> level > 0)); + snapshotReader.withLevelFilter(Filter.alwaysTrue()); + } else if (options.changelogProducer().equals(FULL_COMPACTION)) { + result = + startingScanner.scan( + snapshotReader.withLevelFilter( + level -> level == options.numLevels() - 1)); + snapshotReader.withLevelFilter(Filter.alwaysTrue()); + } else { + result = startingScanner.scan(snapshotReader); + } + if (result instanceof ScannedResult) { ScannedResult scannedResult = (ScannedResult) result; currentWatermark = scannedResult.currentWatermark(); long currentSnapshotId = scannedResult.currentSnapshotId(); - nextSnapshotId = currentSnapshotId + 1; + if (options.lookupStrategy().equals(LookupStrategy.DELETION_VECTOR_ONLY)) { + // For DELETION_VECTOR_ONLY mode, we need to return the remaining data from level 0 + // in the subsequent plan. + nextSnapshotId = currentSnapshotId; + } else { + nextSnapshotId = currentSnapshotId + 1; + } isFullPhaseEnd = boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId)); return scannedResult.plan(); @@ -206,13 +229,7 @@ private FollowUpScanner createFollowUpScanner() { followUpScanner = new InputChangelogFollowUpScanner(); break; case FULL_COMPACTION: - // this change in data split reader will affect both starting scanner and follow-up - snapshotReader.withLevelFilter(level -> level == options.numLevels() - 1); - followUpScanner = new CompactionChangelogFollowUpScanner(); - break; case LOOKUP: - // this change in data split reader will affect both starting scanner and follow-up - snapshotReader.withLevelFilter(level -> level > 0); followUpScanner = new CompactionChangelogFollowUpScanner(); break; default: diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java new file mode 100644 index 000000000000..cc052cc6607f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.utils.BlockingIterator; + +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +/** ITCase for deletion vector table. */ +public class DeletionVectorITCase extends CatalogITCaseBase { + + @ParameterizedTest + @ValueSource(strings = {"none", "lookup"}) + public void testStreamingReadDVTable(String changelogProducer) throws Exception { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", + changelogProducer)); + + sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); + + sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')"); + + sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')"); + + // test read from APPEND snapshot + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */"); ) { + assertThat(iter.collect(12)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2"), + Row.ofKind(RowKind.INSERT, 3, "3"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"), + Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"), + Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); + } + + // test read from COMPACT snapshot + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */"); ) { + assertThat(iter.collect(8)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2_1"), + Row.ofKind(RowKind.INSERT, 3, "3_1"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"), + Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); + } + } +} From 8d43320bca34552620083857b11497df647ebd6c Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 13 Mar 2024 18:01:27 +0800 Subject: [PATCH 2/2] 1 --- .../table/source/InnerTableScanImpl.java | 3 ++ .../paimon/flink/DeletionVectorITCase.java | 32 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index b08da568ebbe..82ae515e16ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -47,6 +47,9 @@ public InnerTableScanImpl( super(options, snapshotReader); this.hasNext = true; this.defaultValueAssigner = defaultValueAssigner; + if (options.deletionVectorsEnabled()) { + snapshotReader.withLevelFilter(level -> level > 0); + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index cc052cc6607f..162e29eece60 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -81,4 +81,36 @@ public void testStreamingReadDVTable(String changelogProducer) throws Exception Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); } } + + @ParameterizedTest + @ValueSource(strings = {"none", "lookup"}) + public void testBatchReadDVTable(String changelogProducer) { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", + changelogProducer)); + + sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); + + sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')"); + + sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')"); + + assertThat(batchSql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111"), + Row.of(2, "2_2"), + Row.of(3, "3_1"), + Row.of(4, "4_1")); + + // batch read dv table will filter level 0 and there will be data delay + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111"), Row.of(2, "2"), Row.of(3, "3"), Row.of(4, "4")); + + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='4') */")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, "3_1"), Row.of(4, "4")); + } }