Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] refactor delete with deletion vector #3744

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
Expand All @@ -38,15 +40,41 @@ public class DeletionVectorIndexFileMaintainer {

private final IndexFileHandler indexFileHandler;

private final BinaryRow partition;
private final int bucket;
private final Map<String, IndexManifestEntry> indexNameToEntry = new HashMap<>();

private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles = new HashMap<>();
private final Map<String, String> dataFileToIndexFile = new HashMap<>();

private final Set<String> touchedIndexFiles = new HashSet<>();

private final DeletionVectorsMaintainer maintainer;

// the key of dataFileToDeletionFiles is the relative path again table's location.
public DeletionVectorIndexFileMaintainer(
IndexFileHandler indexFileHandler, Map<String, DeletionFile> dataFileToDeletionFiles) {
IndexFileHandler indexFileHandler,
Long snapshotId,
BinaryRow partition,
int bucket,
boolean restore) {
this.indexFileHandler = indexFileHandler;
this.partition = partition;
this.bucket = bucket;
if (restore) {
this.maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
} else {
this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create();
}
Map<String, DeletionFile> dataFileToDeletionFiles =
indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
init(dataFileToDeletionFiles);
}

@VisibleForTesting
public void init(Map<String, DeletionFile> dataFileToDeletionFiles) {
List<String> touchedIndexFileNames =
dataFileToDeletionFiles.values().stream()
.map(deletionFile -> new Path(deletionFile.path()).getName())
Expand All @@ -66,7 +94,32 @@ public DeletionVectorIndexFileMaintainer(
indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
}
indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile);
dataFileToIndexFile.put(dataFile, indexFileName);
}
}

public BinaryRow getPartition() {
return this.partition;
}

public int getBucket() {
return this.bucket;
}

public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
DeletionFile previous = null;
if (dataFileToIndexFile.containsKey(dataFile)) {
String indexFileName = dataFileToIndexFile.get(dataFile);
touchedIndexFiles.add(indexFileName);
if (indexFileToDeletionFiles.containsKey(indexFileName)) {
previous = indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
}
}
if (previous != null) {
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, previous));
}
maintainer.notifyNewDeletion(dataFile, deletionVector);
}

public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFiles) {
Expand All @@ -80,6 +133,19 @@ public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFile
}
}

public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = writeUnchangedDeletionVector();
List<IndexManifestEntry> newIndexFileEntries =
maintainer.writeDeletionVectorsIndex().stream()
.map(
fileMeta ->
new IndexManifestEntry(
FileKind.ADD, partition, bucket, fileMeta))
.collect(Collectors.toList());
result.addAll(newIndexFileEntries);
return result;
}

public List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ public Map<String, DeletionVector> readDeletionVector(
return deletionVectors;
}

public DeletionVector readDeletionVector(String dataFile, DeletionFile deletionFile) {
String indexFile = deletionFile.path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new Path(indexFile))) {
checkVersion(inputStream);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new DataInputStream(inputStream);
return readDeletionVector(dataInputStream, (int) deletionFile.length());
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from file: " + indexFile, e);
}
}

/**
* Write deletion vectors to a new file, the format of this file can be referenced at: <a
* href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,6 +48,7 @@
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Handle index files. */
public class IndexFileHandler {
Expand Down Expand Up @@ -81,6 +85,56 @@ public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partitio
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0));
}

public Map<String, DeletionFile> scanDVIndex(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
if (snapshotId == null) {
return Collections.emptyMap();
}
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyMap();
}
Map<String, DeletionFile> result = new HashMap<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
IndexFileMeta meta = file.indexFile();
if (meta.indexType().equals(DELETION_VECTORS_INDEX)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
meta.deletionVectorsRanges();
checkNotNull(dvRanges);
for (String dataFile : dvRanges.keySet()) {
Pair<Integer, Integer> pair = dvRanges.get(dataFile);
DeletionFile deletionFile =
new DeletionFile(
filePath(meta).toString(), pair.getLeft(), pair.getRight());
result.put(dataFile, deletionFile);
}
}
}
return result;
}

public List<IndexManifestEntry> scan(String indexType) {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}

List<IndexManifestEntry> result = new ArrayList<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)) {
result.add(file);
}
}
return result;
}

public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = new ArrayList<>();
Expand Down Expand Up @@ -143,6 +197,26 @@ public List<IndexManifestEntry> scanEntries(
return result;
}

public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partition, int bucket) {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}
List<IndexManifestEntry> result = new ArrayList<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
result.add(file);
}
}
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}
Expand Down Expand Up @@ -218,8 +292,8 @@ public void deleteManifest(String indexManifest) {
}

public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
Map<String, DeletionFile> dataFileToDeletionFiles) {
return new DeletionVectorIndexFileMaintainer(this, dataFileToDeletionFiles);
Long snapshotId, BinaryRow partition, int bucket, boolean restore) {
return new DeletionVectorIndexFileMaintainer(this, snapshotId, partition, bucket, restore);
}

public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int bucket) {
}

public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
Map<String, DeletionFile> dataFileToDeletionFiles) {
return new DeletionVectorIndexFileMaintainer(fileHandler, dataFileToDeletionFiles);
BinaryRow partition, int bucket, Map<String, DeletionFile> dataFileToDeletionFiles) {
DeletionVectorIndexFileMaintainer maintainer =
new DeletionVectorIndexFileMaintainer(fileHandler, null, partition, bucket, false);
maintainer.init(dataFileToDeletionFiles);
return maintainer;
}

public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void test() throws Exception {
indexPathFactory, commitMessage2.indexIncrement().newIndexFiles()));

DeletionVectorIndexFileMaintainer dvIFMaintainer =
store.createDVIFMaintainer(dataFileToDeletionFiles);
store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, dataFileToDeletionFiles);

// no dv should be rewritten, because nothing is changed.
List<IndexManifestEntry> res = dvIFMaintainer.writeUnchangedDeletionVector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.spark.util.SQLHelper
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
Expand All @@ -49,7 +50,8 @@ case class DeleteFromPaimonTableCommand(
extends PaimonLeafRunnableCommand
with PaimonCommand
with ExpressionHelper
with SupportsSubquery {
with SupportsSubquery
with SQLHelper {

private lazy val writer = PaimonSparkWriter(table)

Expand Down Expand Up @@ -124,21 +126,18 @@ case class DeleteFromPaimonTableCommand(
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)

if (deletionVectorsEnabled) {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

deletionVectors.cache()
try {
updateDeletionVector(deletionVectors, dataFilePathToMeta, writer)
} finally {
deletionVectors.unpersist()
}
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

// Step3: update the touched deletion vectors and index files
writer.persistDeletionVectors(deletionVectors)
}
} else {
// Step2: extract out the exactly files, which must have at least one record to be updated.
val touchedFilePaths =
Expand Down
Loading
Loading