Skip to content

Commit

Permalink
[spark] refactor delete with deletion vector to parallel write delet…
Browse files Browse the repository at this point in the history
…ion file (apache#3744)
  • Loading branch information
YannByron authored Jul 29, 2024
1 parent 95831a3 commit be2996e
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
Expand All @@ -38,15 +40,41 @@ public class DeletionVectorIndexFileMaintainer {

private final IndexFileHandler indexFileHandler;

private final BinaryRow partition;
private final int bucket;
private final Map<String, IndexManifestEntry> indexNameToEntry = new HashMap<>();

private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles = new HashMap<>();
private final Map<String, String> dataFileToIndexFile = new HashMap<>();

private final Set<String> touchedIndexFiles = new HashSet<>();

private final DeletionVectorsMaintainer maintainer;

// the key of dataFileToDeletionFiles is the relative path again table's location.
public DeletionVectorIndexFileMaintainer(
IndexFileHandler indexFileHandler, Map<String, DeletionFile> dataFileToDeletionFiles) {
IndexFileHandler indexFileHandler,
Long snapshotId,
BinaryRow partition,
int bucket,
boolean restore) {
this.indexFileHandler = indexFileHandler;
this.partition = partition;
this.bucket = bucket;
if (restore) {
this.maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
} else {
this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create();
}
Map<String, DeletionFile> dataFileToDeletionFiles =
indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
init(dataFileToDeletionFiles);
}

@VisibleForTesting
public void init(Map<String, DeletionFile> dataFileToDeletionFiles) {
List<String> touchedIndexFileNames =
dataFileToDeletionFiles.values().stream()
.map(deletionFile -> new Path(deletionFile.path()).getName())
Expand All @@ -66,7 +94,32 @@ public DeletionVectorIndexFileMaintainer(
indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
}
indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile);
dataFileToIndexFile.put(dataFile, indexFileName);
}
}

public BinaryRow getPartition() {
return this.partition;
}

public int getBucket() {
return this.bucket;
}

public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
DeletionFile previous = null;
if (dataFileToIndexFile.containsKey(dataFile)) {
String indexFileName = dataFileToIndexFile.get(dataFile);
touchedIndexFiles.add(indexFileName);
if (indexFileToDeletionFiles.containsKey(indexFileName)) {
previous = indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
}
}
if (previous != null) {
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, previous));
}
maintainer.notifyNewDeletion(dataFile, deletionVector);
}

public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFiles) {
Expand All @@ -80,6 +133,19 @@ public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFile
}
}

public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = writeUnchangedDeletionVector();
List<IndexManifestEntry> newIndexFileEntries =
maintainer.writeDeletionVectorsIndex().stream()
.map(
fileMeta ->
new IndexManifestEntry(
FileKind.ADD, partition, bucket, fileMeta))
.collect(Collectors.toList());
result.addAll(newIndexFileEntries);
return result;
}

public List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ public Map<String, DeletionVector> readDeletionVector(
return deletionVectors;
}

public DeletionVector readDeletionVector(String dataFile, DeletionFile deletionFile) {
String indexFile = deletionFile.path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new Path(indexFile))) {
checkVersion(inputStream);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new DataInputStream(inputStream);
return readDeletionVector(dataInputStream, (int) deletionFile.length());
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from file: " + indexFile, e);
}
}

/**
* Write deletion vectors to a new file, the format of this file can be referenced at: <a
* href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,6 +48,7 @@
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Handle index files. */
public class IndexFileHandler {
Expand Down Expand Up @@ -81,6 +85,56 @@ public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partitio
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0));
}

public Map<String, DeletionFile> scanDVIndex(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
if (snapshotId == null) {
return Collections.emptyMap();
}
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyMap();
}
Map<String, DeletionFile> result = new HashMap<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
IndexFileMeta meta = file.indexFile();
if (meta.indexType().equals(DELETION_VECTORS_INDEX)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
meta.deletionVectorsRanges();
checkNotNull(dvRanges);
for (String dataFile : dvRanges.keySet()) {
Pair<Integer, Integer> pair = dvRanges.get(dataFile);
DeletionFile deletionFile =
new DeletionFile(
filePath(meta).toString(), pair.getLeft(), pair.getRight());
result.put(dataFile, deletionFile);
}
}
}
return result;
}

public List<IndexManifestEntry> scan(String indexType) {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}

List<IndexManifestEntry> result = new ArrayList<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)) {
result.add(file);
}
}
return result;
}

public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = new ArrayList<>();
Expand Down Expand Up @@ -143,6 +197,26 @@ public List<IndexManifestEntry> scanEntries(
return result;
}

public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partition, int bucket) {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}
List<IndexManifestEntry> result = new ArrayList<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
result.add(file);
}
}
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}
Expand Down Expand Up @@ -218,8 +292,8 @@ public void deleteManifest(String indexManifest) {
}

public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
Map<String, DeletionFile> dataFileToDeletionFiles) {
return new DeletionVectorIndexFileMaintainer(this, dataFileToDeletionFiles);
Long snapshotId, BinaryRow partition, int bucket, boolean restore) {
return new DeletionVectorIndexFileMaintainer(this, snapshotId, partition, bucket, restore);
}

public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int bucket) {
}

public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
Map<String, DeletionFile> dataFileToDeletionFiles) {
return new DeletionVectorIndexFileMaintainer(fileHandler, dataFileToDeletionFiles);
BinaryRow partition, int bucket, Map<String, DeletionFile> dataFileToDeletionFiles) {
DeletionVectorIndexFileMaintainer maintainer =
new DeletionVectorIndexFileMaintainer(fileHandler, null, partition, bucket, false);
maintainer.init(dataFileToDeletionFiles);
return maintainer;
}

public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void test() throws Exception {
indexPathFactory, commitMessage2.indexIncrement().newIndexFiles()));

DeletionVectorIndexFileMaintainer dvIFMaintainer =
store.createDVIFMaintainer(dataFileToDeletionFiles);
store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, dataFileToDeletionFiles);

// no dv should be rewritten, because nothing is changed.
List<IndexManifestEntry> res = dvIFMaintainer.writeUnchangedDeletionVector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.spark.util.SQLHelper
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
Expand All @@ -49,7 +50,8 @@ case class DeleteFromPaimonTableCommand(
extends PaimonLeafRunnableCommand
with PaimonCommand
with ExpressionHelper
with SupportsSubquery {
with SupportsSubquery
with SQLHelper {

private lazy val writer = PaimonSparkWriter(table)

Expand Down Expand Up @@ -124,21 +126,18 @@ case class DeleteFromPaimonTableCommand(
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)

if (deletionVectorsEnabled) {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

deletionVectors.cache()
try {
updateDeletionVector(deletionVectors, dataFilePathToMeta, writer)
} finally {
deletionVectors.unpersist()
}
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

// Step3: update the touched deletion vectors and index files
writer.persistDeletionVectors(deletionVectors)
}
} else {
// Step2: extract out the exactly files, which must have at least one record to be updated.
val touchedFilePaths =
Expand Down
Loading

0 comments on commit be2996e

Please sign in to comment.