diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index bdf47b16abb5..7e5efccdd813 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -146,9 +146,14 @@ public List scan( } public Map, List> scan( - long snapshotId, String indexType, Set partitions) { + long snapshot, String indexType, Set partitions) { + return scan(snapshotManager.snapshot(snapshot), indexType, partitions); + } + + public Map, List> scan( + Snapshot snapshot, String indexType, Set partitions) { Map, List> result = new HashMap<>(); - for (IndexManifestEntry file : scanEntries(snapshotId, indexType, partitions)) { + for (IndexManifestEntry file : scanEntries(snapshot, indexType, partitions)) { result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k -> new ArrayList<>()) .add(file.indexFile()); } @@ -179,8 +184,12 @@ public List scanEntries( } public List scanEntries( - long snapshotId, String indexType, Set partitions) { - Snapshot snapshot = snapshotManager.snapshot(snapshotId); + long snapshot, String indexType, Set partitions) { + return scanEntries(snapshotManager.snapshot(snapshot), indexType, partitions); + } + + public List scanEntries( + Snapshot snapshot, String indexType, Set partitions) { String indexManifest = snapshot.indexManifest(); if (indexManifest == null) { return Collections.emptyList(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index ec3d4a2392c5..f1a95aec90c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -236,13 +236,8 @@ public Long watermark() { @Nullable @Override - public Long snapshotId() { - return readSnapshot == null ? null : readSnapshot.id(); - } - - @Override - public ScanMode scanMode() { - return scanMode; + public Snapshot snapshot() { + return readSnapshot; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index c7b0e8cdf73f..427e958e8009 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -111,13 +111,11 @@ interface Plan { Long watermark(); /** - * Snapshot id of this plan, return null if the table is empty or the manifest list is + * Snapshot of this plan, return null if the table is empty or the manifest list is * specified. */ @Nullable - Long snapshotId(); - - ScanMode scanMode(); + Snapshot snapshot(); /** Result {@link ManifestEntry} files. */ List files(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index ef3523dfdeaa..47e6971306e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -47,6 +47,8 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -58,6 +60,7 @@ import java.util.Set; import java.util.function.BiConsumer; +import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; @@ -254,7 +257,7 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt @Override public Plan read() { FileStoreScan.Plan plan = scan.plan(); - Long snapshotId = plan.snapshotId(); + @Nullable Snapshot snapshot = plan.snapshot(); Map>> files = groupByPartFiles(plan.files(FileKind.ADD)); @@ -266,25 +269,22 @@ public Plan read() { files = newFiles; } List splits = - generateSplits( - snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId, - scanMode != ScanMode.ALL, - splitGenerator, - files); - return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); + generateSplits(snapshot, scanMode != ScanMode.ALL, splitGenerator, files); + return new PlanImpl( + plan.watermark(), snapshot == null ? null : snapshot.id(), (List) splits); } private List generateSplits( - long snapshotId, + @Nullable Snapshot snapshot, boolean isStreaming, SplitGenerator splitGenerator, Map>> groupedDataFiles) { List splits = new ArrayList<>(); // Read deletion indexes at once to reduce file IO Map, List> deletionIndexFilesMap = - deletionVectors + deletionVectors && snapshot != null ? indexFileHandler.scan( - snapshotId, DELETION_VECTORS_INDEX, groupedDataFiles.keySet()) + snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet()) : Collections.emptyMap(); for (Map.Entry>> entry : groupedDataFiles.entrySet()) { @@ -295,7 +295,8 @@ private List generateSplits( List bucketFiles = bucketEntry.getValue(); DataSplit.Builder builder = DataSplit.builder() - .withSnapshot(snapshotId) + .withSnapshot( + snapshot == null ? FIRST_SNAPSHOT_ID - 1 : snapshot.id()) .withPartition(partition) .withBucket(bucket) .isStreaming(isStreaming); @@ -344,7 +345,7 @@ public Plan readChanges() { Map>> dataFiles = groupByPartFiles(plan.files(FileKind.ADD)); - return toChangesPlan(true, plan, plan.snapshotId() - 1, beforeFiles, dataFiles); + return toChangesPlan(true, plan, plan.snapshot().id() - 1, beforeFiles, dataFiles); } private Plan toChangesPlan( @@ -353,6 +354,7 @@ private Plan toChangesPlan( long beforeSnapshotId, Map>> beforeFiles, Map>> dataFiles) { + Snapshot snapshot = plan.snapshot(); List splits = new ArrayList<>(); Map> buckets = new HashMap<>(); beforeFiles.forEach( @@ -372,7 +374,7 @@ private Plan toChangesPlan( Map, List> deletionIndexFilesMap = deletionVectors ? indexFileHandler.scan( - plan.snapshotId(), DELETION_VECTORS_INDEX, dataFiles.keySet()) + snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet()) : Collections.emptyMap(); for (Map.Entry> entry : buckets.entrySet()) { @@ -392,7 +394,7 @@ private Plan toChangesPlan( DataSplit.Builder builder = DataSplit.builder() - .withSnapshot(plan.snapshotId()) + .withSnapshot(snapshot.id()) .withPartition(part) .withBucket(bucket) .withBeforeFiles(before) @@ -415,7 +417,8 @@ private Plan toChangesPlan( } } - return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); + return new PlanImpl( + plan.watermark(), snapshot == null ? null : snapshot.id(), (List) splits); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index e4b74c13c656..ce17450538b1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -294,7 +294,8 @@ private void runTestContainsAll( private Map getActualKvMap(FileStoreScan scan, Long expectedSnapshotId) throws Exception { FileStoreScan.Plan plan = scan.plan(); - assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId); + Snapshot snapshot = plan.snapshot(); + assertThat(snapshot == null ? null : snapshot.id()).isEqualTo(expectedSnapshotId); List actualKvs = store.readKvsFromManifestEntries(plan.files(), false); gen.sort(actualKvs); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index 832cdf28647c..ad29709f5acb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -262,4 +263,20 @@ public void testBatchReadDVTableWithSequenceField(String changelogProducer) { assertThat(batchSql("SELECT * FROM T")) .containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2, "2_1")); } + + @Test + public void testReadTagWithDv() { + sql( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH (" + + "'deletion-vectors.enabled' = 'true', " + + "'snapshot.num-retained.min' = '1', " + + "'snapshot.num-retained.max' = '1')"); + + sql("INSERT INTO T VALUES (1, '1'), (2, '2')"); + sql("CALL sys.create_tag('default.T', 'my_tag')"); + sql("INSERT INTO T VALUES (3, '3'), (4, '4')"); + + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='my_tag') */")) + .containsExactlyInAnyOrder(Row.of(1, "1"), Row.of(2, "2")); + } }