From be2996efe24b3757099261d2287a923771f08555 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Mon, 29 Jul 2024 10:57:17 +0800 Subject: [PATCH] [spark] refactor delete with deletion vector to parallel write deletion file (#3744) --- .../DeletionVectorIndexFileMaintainer.java | 68 ++++++++++++++- .../DeletionVectorsIndexFile.java | 13 +++ .../apache/paimon/index/IndexFileHandler.java | 78 ++++++++++++++++- .../apache/paimon/TestAppendFileStore.java | 7 +- ...DeletionVectorIndexFileMaintainerTest.java | 2 +- .../DeleteFromPaimonTableCommand.scala | 29 ++++--- .../paimon/spark/commands/PaimonCommand.scala | 77 ++--------------- .../spark/commands/PaimonSparkWriter.scala | 83 +++++++++---------- .../spark/commands/SparkDataFileMeta.scala | 6 -- .../commands/UpdatePaimonTableCommand.scala | 2 +- .../apache/paimon/spark/util/SQLHelper.scala | 55 ++++++++++++ 11 files changed, 277 insertions(+), 143 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java index 50e921f0ddd5..39a0c75921ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java @@ -18,6 +18,8 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; @@ -38,15 +40,41 @@ public class DeletionVectorIndexFileMaintainer { private final IndexFileHandler indexFileHandler; + private final BinaryRow partition; + private final int bucket; private final Map indexNameToEntry = new HashMap<>(); private final Map> indexFileToDeletionFiles = new HashMap<>(); + private final Map dataFileToIndexFile = new HashMap<>(); private final Set touchedIndexFiles = new HashSet<>(); + private final DeletionVectorsMaintainer maintainer; + + // the key of dataFileToDeletionFiles is the relative path again table's location. public DeletionVectorIndexFileMaintainer( - IndexFileHandler indexFileHandler, Map dataFileToDeletionFiles) { + IndexFileHandler indexFileHandler, + Long snapshotId, + BinaryRow partition, + int bucket, + boolean restore) { this.indexFileHandler = indexFileHandler; + this.partition = partition; + this.bucket = bucket; + if (restore) { + this.maintainer = + new DeletionVectorsMaintainer.Factory(indexFileHandler) + .createOrRestore(snapshotId, partition, bucket); + } else { + this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create(); + } + Map dataFileToDeletionFiles = + indexFileHandler.scanDVIndex(snapshotId, partition, bucket); + init(dataFileToDeletionFiles); + } + + @VisibleForTesting + public void init(Map dataFileToDeletionFiles) { List touchedIndexFileNames = dataFileToDeletionFiles.values().stream() .map(deletionFile -> new Path(deletionFile.path()).getName()) @@ -66,7 +94,32 @@ public DeletionVectorIndexFileMaintainer( indexFileToDeletionFiles.put(indexFileName, new HashMap<>()); } indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile); + dataFileToIndexFile.put(dataFile, indexFileName); + } + } + + public BinaryRow getPartition() { + return this.partition; + } + + public int getBucket() { + return this.bucket; + } + + public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) { + DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); + DeletionFile previous = null; + if (dataFileToIndexFile.containsKey(dataFile)) { + String indexFileName = dataFileToIndexFile.get(dataFile); + touchedIndexFiles.add(indexFileName); + if (indexFileToDeletionFiles.containsKey(indexFileName)) { + previous = indexFileToDeletionFiles.get(indexFileName).remove(dataFile); + } } + if (previous != null) { + deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, previous)); + } + maintainer.notifyNewDeletion(dataFile, deletionVector); } public void notifyDeletionFiles(Map dataFileToDeletionFiles) { @@ -80,6 +133,19 @@ public void notifyDeletionFiles(Map dataFileToDeletionFile } } + public List persist() { + List result = writeUnchangedDeletionVector(); + List newIndexFileEntries = + maintainer.writeDeletionVectorsIndex().stream() + .map( + fileMeta -> + new IndexManifestEntry( + FileKind.ADD, partition, bucket, fileMeta)) + .collect(Collectors.toList()); + result.addAll(newIndexFileEntries); + return result; + } + public List writeUnchangedDeletionVector() { DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); List newIndexEntries = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index f21aeb5a774d..ffb024734ef5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -121,6 +121,19 @@ public Map readDeletionVector( return deletionVectors; } + public DeletionVector readDeletionVector(String dataFile, DeletionFile deletionFile) { + String indexFile = deletionFile.path(); + try (SeekableInputStream inputStream = fileIO.newInputStream(new Path(indexFile))) { + checkVersion(inputStream); + checkArgument(deletionFile.path().equals(indexFile)); + inputStream.seek(deletionFile.offset()); + DataInputStream dataInputStream = new DataInputStream(inputStream); + return readDeletionVector(dataInputStream, (int) deletionFile.length()); + } catch (Exception e) { + throw new RuntimeException("Unable to read deletion vector from file: " + indexFile, e); + } + } + /** * Write deletion vectors to a new file, the format of this file can be referenced at: PIP-16. diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index afafb42ecc33..eed18af17b67 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -32,11 +32,14 @@ import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,6 +48,7 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Handle index files. */ public class IndexFileHandler { @@ -81,6 +85,56 @@ public Optional scanHashIndex(long snapshotId, BinaryRow partitio return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0)); } + public Map scanDVIndex( + @Nullable Long snapshotId, BinaryRow partition, int bucket) { + if (snapshotId == null) { + return Collections.emptyMap(); + } + Snapshot snapshot = snapshotManager.snapshot(snapshotId); + String indexManifest = snapshot.indexManifest(); + if (indexManifest == null) { + return Collections.emptyMap(); + } + Map result = new HashMap<>(); + for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) { + IndexFileMeta meta = file.indexFile(); + if (meta.indexType().equals(DELETION_VECTORS_INDEX) + && file.partition().equals(partition) + && file.bucket() == bucket) { + LinkedHashMap> dvRanges = + meta.deletionVectorsRanges(); + checkNotNull(dvRanges); + for (String dataFile : dvRanges.keySet()) { + Pair pair = dvRanges.get(dataFile); + DeletionFile deletionFile = + new DeletionFile( + filePath(meta).toString(), pair.getLeft(), pair.getRight()); + result.put(dataFile, deletionFile); + } + } + } + return result; + } + + public List scan(String indexType) { + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + return Collections.emptyList(); + } + String indexManifest = snapshot.indexManifest(); + if (indexManifest == null) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(); + for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) { + if (file.indexFile().indexType().equals(indexType)) { + result.add(file); + } + } + return result; + } + public List scan( long snapshotId, String indexType, BinaryRow partition, int bucket) { List result = new ArrayList<>(); @@ -143,6 +197,26 @@ public List scanEntries( return result; } + public List scanEntries(String indexType, BinaryRow partition, int bucket) { + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + return Collections.emptyList(); + } + String indexManifest = snapshot.indexManifest(); + if (indexManifest == null) { + return Collections.emptyList(); + } + List result = new ArrayList<>(); + for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) { + if (file.indexFile().indexType().equals(indexType) + && file.partition().equals(partition) + && file.bucket() == bucket) { + result.add(file); + } + } + return result; + } + public Path filePath(IndexFileMeta file) { return pathFactory.toPath(file.fileName()); } @@ -218,8 +292,8 @@ public void deleteManifest(String indexManifest) { } public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer( - Map dataFileToDeletionFiles) { - return new DeletionVectorIndexFileMaintainer(this, dataFileToDeletionFiles); + Long snapshotId, BinaryRow partition, int bucket, boolean restore) { + return new DeletionVectorIndexFileMaintainer(this, snapshotId, partition, bucket, restore); } public Map readAllDeletionVectors(List fileMetas) { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index d01210e47e34..c86b1cb404ce 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -119,8 +119,11 @@ public List scanDVIndexFiles(BinaryRow partition, int bucket) { } public DeletionVectorIndexFileMaintainer createDVIFMaintainer( - Map dataFileToDeletionFiles) { - return new DeletionVectorIndexFileMaintainer(fileHandler, dataFileToDeletionFiles); + BinaryRow partition, int bucket, Map dataFileToDeletionFiles) { + DeletionVectorIndexFileMaintainer maintainer = + new DeletionVectorIndexFileMaintainer(fileHandler, null, partition, bucket, false); + maintainer.init(dataFileToDeletionFiles); + return maintainer; } public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java index 20cb47557e21..f78e39dfbb67 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java @@ -69,7 +69,7 @@ public void test() throws Exception { indexPathFactory, commitMessage2.indexIncrement().newIndexFiles())); DeletionVectorIndexFileMaintainer dvIFMaintainer = - store.createDVIFMaintainer(dataFileToDeletionFiles); + store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, dataFileToDeletionFiles); // no dv should be rewritten, because nothing is changed. List res = dvIFMaintainer.writeUnchangedDeletionVector(); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 4bcd9dce6def..2aef8e576410 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL +import org.apache.paimon.spark.util.SQLHelper import org.apache.paimon.table.{BucketMode, FileStoreTable} import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage} import org.apache.paimon.types.RowKind @@ -49,7 +50,8 @@ case class DeleteFromPaimonTableCommand( extends PaimonLeafRunnableCommand with PaimonCommand with ExpressionHelper - with SupportsSubquery { + with SupportsSubquery + with SQLHelper { private lazy val writer = PaimonSparkWriter(table) @@ -124,21 +126,18 @@ case class DeleteFromPaimonTableCommand( val dataFilePathToMeta = candidateFileMap(candidateDataSplits) if (deletionVectorsEnabled) { - // Step2: collect all the deletion vectors that marks the deleted rows. - val deletionVectors = collectDeletionVectors( - candidateDataSplits, - dataFilePathToMeta, - condition, - relation, - sparkSession) - - deletionVectors.cache() - try { - updateDeletionVector(deletionVectors, dataFilePathToMeta, writer) - } finally { - deletionVectors.unpersist() - } + withSQLConf("spark.sql.adaptive.enabled" -> "false") { + // Step2: collect all the deletion vectors that marks the deleted rows. + val deletionVectors = collectDeletionVectors( + candidateDataSplits, + dataFilePathToMeta, + condition, + relation, + sparkSession) + // Step3: update the touched deletion vectors and index files + writer.persistDeletionVectors(deletionVectors) + } } else { // Step2: extract out the exactly files, which must have at least one record to be updated. val touchedFilePaths = diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index abd89b0b6239..d5dd1e19ecb3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -22,14 +22,12 @@ import org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector} import org.apache.paimon.fs.Path import org.apache.paimon.index.IndexFileMeta import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement} -import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter} import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.spark.schema.PaimonMetadataColumn._ -import org.apache.paimon.table.BucketMode import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} import org.apache.paimon.table.source.DataSplit import org.apache.paimon.types.RowType @@ -148,62 +146,6 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper { .toMap } - protected def getDeletedIndexFiles( - dataFilePathToMeta: Map[String, SparkDataFileMeta], - newDeletionVectors: Dataset[SparkDeletionVectors] - ): Seq[IndexManifestEntry] = { - val deletionFiles = dataFilePathToMeta.flatMap { - case (relativePath, sdf) => - sdf.deletionFile match { - case Some(deletionFile) => - Some((relativePath, deletionFile)) - case None => None - } - } - val dvIndexFileMaintainer = fileStore - .newIndexFileHandler() - .createDVIndexFileMaintainer(deletionFiles.asJava) - - val pathFactory = fileStore.pathFactory() - val touchedDataFileAndDeletionFiles = newDeletionVectors - .collect() - .flatMap { - sdv => - val relativePaths = sdv.relativePaths(pathFactory) - relativePaths.flatMap { - relativePath => - dataFilePathToMeta(relativePath).deletionFile match { - case Some(deletionFile) => Some(relativePath, deletionFile) - case _ => None - } - } - } - .toMap - - dvIndexFileMaintainer.notifyDeletionFiles(touchedDataFileAndDeletionFiles.asJava) - - dvIndexFileMaintainer.writeUnchangedDeletionVector().asScala - } - - protected def updateDeletionVector( - deletionVectors: Dataset[SparkDeletionVectors], - dataFilePathToMeta: Map[String, SparkDataFileMeta], - writer: PaimonSparkWriter): Seq[CommitMessage] = { - // Step1: write the new deletion vectors - val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors) - - // Step2: write the unchanged deletion vectors where store in touched dv index files, and mark these touched index files as DELETE if needed. - val rewriteIndexCommitMsg = fileStore.bucketMode() match { - case BucketMode.BUCKET_UNAWARE => - val indexEntries = getDeletedIndexFiles(dataFilePathToMeta, deletionVectors) - writer.buildCommitMessageFromIndexManifestEntry(indexEntries) - case _ => - Seq.empty[CommitMessage] - } - - newIndexCommitMsg ++ rewriteIndexCommitMsg - } - protected def collectDeletionVectors( candidateDataSplits: Seq[DataSplit], dataFilePathToMeta: Map[String, SparkDataFileMeta], @@ -212,12 +154,12 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper { sparkSession: SparkSession): Dataset[SparkDeletionVectors] = { import sparkSession.implicits._ - val dataFileAndDeletionFile = dataFilePathToMeta.mapValues(_.toSparkDeletionFile).toArray + val dataFileToPartitionAndBucket = + dataFilePathToMeta.mapValues(meta => (meta.partition, meta.bucket)).toArray val metadataCols = Seq(FILE_PATH, ROW_INDEX) val filteredRelation = createNewScanPlan(candidateDataSplits, condition, relation, metadataCols) val store = table.store() - val fileIO = table.fileIO() val location = table.location createDataset(sparkSession, filteredRelation) .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN) @@ -225,29 +167,22 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper { .groupByKey(_._1) .mapGroups { case (filePath, iter) => - val fileNameToDeletionFile = dataFileAndDeletionFile.toMap val dv = new BitmapDeletionVector() while (iter.hasNext) { dv.delete(iter.next()._2) } val relativeFilePath = location.toUri.relativize(new URI(filePath)).toString - val sparkDeletionFile = fileNameToDeletionFile(relativeFilePath) - sparkDeletionFile.deletionFile match { - case Some(deletionFile) => - dv.merge(DeletionVector.read(fileIO, deletionFile)) - case None => - } - + val (partition, bucket) = dataFileToPartitionAndBucket.toMap.apply(relativeFilePath) val pathFactory = store.pathFactory() val partitionAndBucket = pathFactory - .relativePartitionAndBucketPath(sparkDeletionFile.partition, sparkDeletionFile.bucket) + .relativePartitionAndBucketPath(partition, bucket) .toString SparkDeletionVectors( partitionAndBucket, - SerializationUtils.serializeBinaryRow(sparkDeletionFile.partition), - sparkDeletionFile.bucket, + SerializationUtils.serializeBinaryRow(partition), + bucket, Seq((new Path(filePath).getName, dv.serializeToBytes())) ) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 41f2db1a3b1c..62fb4689d3c9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.WRITE_ONLY import org.apache.paimon.data.BinaryRow -import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorsMaintainer} +import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorIndexFileMaintainer} import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner} import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement} import org.apache.paimon.manifest.{FileKind, IndexManifestEntry} @@ -200,60 +200,55 @@ case class PaimonSparkWriter(table: FileStoreTable) { /** * Write all the deletion vectors to the index files. If it's in unaware mode, one index file maps - * one deletion vector; else, one index file will contains all deletion vector with the same + * deletion vectors; else, one index file will contains all deletion vector with the same * partition and bucket. */ def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]): Seq[CommitMessage] = { val sparkSession = deletionVectors.sparkSession import sparkSession.implicits._ - + val snapshotId = table.snapshotManager().latestSnapshotId(); val fileStore = table.store() - val lastSnapshotId = table.snapshotManager().latestSnapshotId() - - def createOrRestoreDVMaintainer( - partition: BinaryRow, - bucket: Int): DeletionVectorsMaintainer = { - val deletionVectorsMaintainerFactory = - new DeletionVectorsMaintainer.Factory(fileStore.newIndexFileHandler()) - fileStore.bucketMode() match { - case BucketMode.BUCKET_UNAWARE => - deletionVectorsMaintainerFactory.create() - case _ => - deletionVectorsMaintainerFactory.createOrRestore(lastSnapshotId, partition, bucket) - } - } - - def commitDeletionVector( - sdv: SparkDeletionVectors, - serializer: CommitMessageSerializer): Array[Byte] = { - val partition = SerializationUtils.deserializeBinaryRow(sdv.partition) - val maintainer = createOrRestoreDVMaintainer(partition, sdv.bucket) - sdv.dataFileAndDeletionVector.foreach { - case (dataFileName, dv) => - maintainer.notifyNewDeletion(dataFileName, DeletionVector.deserializeFromBytes(dv)) - } - - val commitMessage = new CommitMessageImpl( - partition, - sdv.bucket, - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(maintainer.writeDeletionVectorsIndex())) - - serializer.serialize(commitMessage) - } - val serializedCommits = deletionVectors .groupByKey(_.partitionAndBucket) .mapGroups { case (_, iter: Iterator[SparkDeletionVectors]) => - val serializer = new CommitMessageSerializer - val grouped = iter.reduce { - (sdv1, sdv2) => - sdv1.copy(dataFileAndDeletionVector = - sdv1.dataFileAndDeletionVector ++ sdv2.dataFileAndDeletionVector) + val indexHandler = fileStore.newIndexFileHandler() + var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null + while (iter.hasNext) { + val sdv: SparkDeletionVectors = iter.next() + if (dvIndexFileMaintainer == null) { + val partition = SerializationUtils.deserializeBinaryRow(sdv.partition) + dvIndexFileMaintainer = indexHandler + .createDVIndexFileMaintainer( + snapshotId, + partition, + sdv.bucket, + bucketMode != BucketMode.BUCKET_UNAWARE) + } + if (dvIndexFileMaintainer == null) { + throw new RuntimeException("can't create the dv maintainer.") + } + + sdv.dataFileAndDeletionVector.foreach { + case (dataFileName, dv) => + dvIndexFileMaintainer.notifyDeletionFiles( + dataFileName, + DeletionVector.deserializeFromBytes(dv)) + } } - commitDeletionVector(grouped, serializer) + val indexEntries = dvIndexFileMaintainer.persist() + + val (added, deleted) = indexEntries.asScala.partition(_.kind() == FileKind.ADD) + + val commitMessage = new CommitMessageImpl( + dvIndexFileMaintainer.getPartition, + dvIndexFileMaintainer.getBucket, + DataIncrement.emptyIncrement(), + CompactIncrement.emptyIncrement(), + new IndexIncrement(added.map(_.indexFile).asJava, deleted.map(_.indexFile).asJava) + ) + val serializer = new CommitMessageSerializer + serializer.serialize(commitMessage) } serializedCommits .collect() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala index 1e0f1d6d3f07..b380d36c3f81 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -39,14 +39,8 @@ case class SparkDataFileMeta( .toUri .toString + "/" + dataFileMeta.fileName() } - - def toSparkDeletionFile: SparkDeletionFile = { - SparkDeletionFile(partition, bucket, deletionFile) - } } -case class SparkDeletionFile(partition: BinaryRow, bucket: Int, deletionFile: Option[DeletionFile]) - object SparkDataFileMeta { def convertToSparkDataFileMeta( dataSplit: DataSplit, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index ca12d6a1a366..6c7d07bf5e26 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -104,7 +104,7 @@ case class UpdatePaimonTableCommand( val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits) // Step4: write these deletion vectors. - val indexCommitMsg = updateDeletionVector(deletionVectors, dataFilePathToMeta, writer) + val indexCommitMsg = writer.persistDeletionVectors(deletionVectors) addCommitMessage ++ indexCommitMsg } finally { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala new file mode 100644 index 000000000000..88243b36d0de --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util + +import org.apache.spark.sql.internal.SQLConf + +trait SQLHelper { + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL + * configurations. + */ + protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { + val conf = SQLConf.get + val (keys, values) = pairs.unzip + val currentValues = keys.map { + key => + if (conf.contains(key)) { + Some(conf.getConfString(key)) + } else { + None + } + } + (keys, values).zipped.foreach { + (k, v) => + if (SQLConf.isStaticConfigKey(k)) { + throw new RuntimeException(s"Cannot modify the value of a static config: $k") + } + conf.setConfString(k, v) + } + try f + finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => conf.setConfString(key, value) + case (key, None) => conf.unsetConf(key) + } + } + } +}