Skip to content

Commit

Permalink
[core] Fix tag read for deletion vectors table (apache#4121)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Sep 4, 2024
1 parent d77df4f commit 169bea9
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,14 @@ public List<IndexFileMeta> scan(
}

public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
long snapshotId, String indexType, Set<BinaryRow> partitions) {
long snapshot, String indexType, Set<BinaryRow> partitions) {
return scan(snapshotManager.snapshot(snapshot), indexType, partitions);
}

public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new HashMap<>();
for (IndexManifestEntry file : scanEntries(snapshotId, indexType, partitions)) {
for (IndexManifestEntry file : scanEntries(snapshot, indexType, partitions)) {
result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k -> new ArrayList<>())
.add(file.indexFile());
}
Expand Down Expand Up @@ -179,8 +184,12 @@ public List<IndexManifestEntry> scanEntries(
}

public List<IndexManifestEntry> scanEntries(
long snapshotId, String indexType, Set<BinaryRow> partitions) {
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
long snapshot, String indexType, Set<BinaryRow> partitions) {
return scanEntries(snapshotManager.snapshot(snapshot), indexType, partitions);
}

public List<IndexManifestEntry> scanEntries(
Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,8 @@ public Long watermark() {

@Nullable
@Override
public Long snapshotId() {
return readSnapshot == null ? null : readSnapshot.id();
}

@Override
public ScanMode scanMode() {
return scanMode;
public Snapshot snapshot() {
return readSnapshot;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ interface Plan {
Long watermark();

/**
* Snapshot id of this plan, return null if the table is empty or the manifest list is
* Snapshot of this plan, return null if the table is empty or the manifest list is
* specified.
*/
@Nullable
Long snapshotId();

ScanMode scanMode();
Snapshot snapshot();

/** Result {@link ManifestEntry} files. */
List<ManifestEntry> files();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -58,6 +60,7 @@
import java.util.Set;
import java.util.function.BiConsumer;

import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
Expand Down Expand Up @@ -254,7 +257,7 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt
@Override
public Plan read() {
FileStoreScan.Plan plan = scan.plan();
Long snapshotId = plan.snapshotId();
@Nullable Snapshot snapshot = plan.snapshot();

Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
groupByPartFiles(plan.files(FileKind.ADD));
Expand All @@ -266,25 +269,22 @@ public Plan read() {
files = newFiles;
}
List<DataSplit> splits =
generateSplits(
snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId,
scanMode != ScanMode.ALL,
splitGenerator,
files);
return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits);
generateSplits(snapshot, scanMode != ScanMode.ALL, splitGenerator, files);
return new PlanImpl(
plan.watermark(), snapshot == null ? null : snapshot.id(), (List) splits);
}

private List<DataSplit> generateSplits(
long snapshotId,
@Nullable Snapshot snapshot,
boolean isStreaming,
SplitGenerator splitGenerator,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors
deletionVectors && snapshot != null
? indexFileHandler.scan(
snapshotId, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
: Collections.emptyMap();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
Expand All @@ -295,7 +295,8 @@ private List<DataSplit> generateSplits(
List<DataFileMeta> bucketFiles = bucketEntry.getValue();
DataSplit.Builder builder =
DataSplit.builder()
.withSnapshot(snapshotId)
.withSnapshot(
snapshot == null ? FIRST_SNAPSHOT_ID - 1 : snapshot.id())
.withPartition(partition)
.withBucket(bucket)
.isStreaming(isStreaming);
Expand Down Expand Up @@ -344,7 +345,7 @@ public Plan readChanges() {
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));

return toChangesPlan(true, plan, plan.snapshotId() - 1, beforeFiles, dataFiles);
return toChangesPlan(true, plan, plan.snapshot().id() - 1, beforeFiles, dataFiles);
}

private Plan toChangesPlan(
Expand All @@ -353,6 +354,7 @@ private Plan toChangesPlan(
long beforeSnapshotId,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
Snapshot snapshot = plan.snapshot();
List<DataSplit> splits = new ArrayList<>();
Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
beforeFiles.forEach(
Expand All @@ -372,7 +374,7 @@ private Plan toChangesPlan(
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
plan.snapshotId(), DELETION_VECTORS_INDEX, dataFiles.keySet())
snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet())
: Collections.emptyMap();

for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
Expand All @@ -392,7 +394,7 @@ private Plan toChangesPlan(

DataSplit.Builder builder =
DataSplit.builder()
.withSnapshot(plan.snapshotId())
.withSnapshot(snapshot.id())
.withPartition(part)
.withBucket(bucket)
.withBeforeFiles(before)
Expand All @@ -415,7 +417,8 @@ private Plan toChangesPlan(
}
}

return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits);
return new PlanImpl(
plan.watermark(), snapshot == null ? null : snapshot.id(), (List) splits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ private void runTestContainsAll(
private Map<BinaryRow, BinaryRow> getActualKvMap(FileStoreScan scan, Long expectedSnapshotId)
throws Exception {
FileStoreScan.Plan plan = scan.plan();
assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
Snapshot snapshot = plan.snapshot();
assertThat(snapshot == null ? null : snapshot.id()).isEqualTo(expectedSnapshotId);

List<KeyValue> actualKvs = store.readKvsFromManifestEntries(plan.files(), false);
gen.sort(actualKvs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand Down Expand Up @@ -262,4 +263,20 @@ public void testBatchReadDVTableWithSequenceField(String changelogProducer) {
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2, "2_1"));
}

@Test
public void testReadTagWithDv() {
sql(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ("
+ "'deletion-vectors.enabled' = 'true', "
+ "'snapshot.num-retained.min' = '1', "
+ "'snapshot.num-retained.max' = '1')");

sql("INSERT INTO T VALUES (1, '1'), (2, '2')");
sql("CALL sys.create_tag('default.T', 'my_tag')");
sql("INSERT INTO T VALUES (3, '3'), (4, '4')");

assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='my_tag') */"))
.containsExactlyInAnyOrder(Row.of(1, "1"), Row.of(2, "2"));
}
}

0 comments on commit 169bea9

Please sign in to comment.