From c4762ccddd198da0b355a69cd7c083ab18b116fa Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 13 Aug 2024 18:08:09 +0800 Subject: [PATCH] [core] Refactor AppendDeletionFileMaintainer to separate bucketed and unaware (#3950) --- .../DeletionVectorsMaintainer.java | 16 +++++ .../append/AppendDeletionFileMaintainer.java | 72 +++++++++++++++++++ .../BucketedAppendDeletionFileMaintainer.java | 65 +++++++++++++++++ .../UnawareAppendDeletionFileMaintainer.java} | 71 ++++++++---------- .../apache/paimon/index/IndexFileHandler.java | 6 -- .../org/apache/paimon/table/BucketMode.java | 4 +- .../sink/UnawareBucketRowKeyExtractor.java | 3 +- .../apache/paimon/TestAppendFileStore.java | 12 ++-- .../AppendDeletionFileMaintainerTest.java} | 17 +++-- .../spark/commands/PaimonSparkWriter.scala | 19 +++-- 10 files changed, 216 insertions(+), 69 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java rename paimon-core/src/main/java/org/apache/paimon/deletionvectors/{DeletionVectorIndexFileMaintainer.java => append/UnawareAppendDeletionFileMaintainer.java} (74%) rename paimon-core/src/test/java/org/apache/paimon/deletionvectors/{DeletionVectorIndexFileMaintainerTest.java => append/AppendDeletionFileMaintainerTest.java} (91%) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index 8079d977c3cd..8dd6af1de91f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -75,6 +75,22 @@ public void notifyNewDeletion(String fileName, DeletionVector deletionVector) { modified = true; } + /** + * Merge a new deletion which marks the specified deletion vector with the given file name, if + * the previous deletion vector exist, merge the old one. + * + * @param fileName The name of the file where the deletion occurred. + * @param deletionVector The deletion vector + */ + public void mergeNewDeletion(String fileName, DeletionVector deletionVector) { + DeletionVector old = deletionVectors.get(fileName); + if (old != null) { + deletionVector.merge(old); + } + deletionVectors.put(fileName, deletionVector); + modified = true; + } + /** * Removes the specified file's deletion vector, this method is typically used for remove before * files' deletion vector in compaction. diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java new file mode 100644 index 000000000000..4922e45a6cee --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.deletionvectors.append; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.source.DeletionFile; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; + +/** + * A maintainer to maintain deletion files for append table, the core methods: + * + * + */ +public interface AppendDeletionFileMaintainer { + + BinaryRow getPartition(); + + int getBucket(); + + void notifyDeletionFiles(String dataFile, DeletionVector deletionVector); + + List persist(); + + static AppendDeletionFileMaintainer forBucketedAppend( + IndexFileHandler indexFileHandler, + @Nullable Long snapshotId, + BinaryRow partition, + int bucket) { + // bucket should have only one deletion file, so here we should read old deletion vectors, + // overwrite the entire deletion file of the bucket when writing deletes. + DeletionVectorsMaintainer maintainer = + new DeletionVectorsMaintainer.Factory(indexFileHandler) + .createOrRestore(snapshotId, partition, bucket); + return new BucketedAppendDeletionFileMaintainer(partition, bucket, maintainer); + } + + static AppendDeletionFileMaintainer forUnawareAppend( + IndexFileHandler indexFileHandler, @Nullable Long snapshotId, BinaryRow partition) { + Map deletionFiles = + indexFileHandler.scanDVIndex(snapshotId, partition, UNAWARE_BUCKET); + return new UnawareAppendDeletionFileMaintainer(indexFileHandler, partition, deletionFiles); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java new file mode 100644 index 000000000000..1b839575f54d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.deletionvectors.append; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; + +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link AppendDeletionFileMaintainer} of bucketed append table. */ +public class BucketedAppendDeletionFileMaintainer implements AppendDeletionFileMaintainer { + + private final BinaryRow partition; + private final int bucket; + private final DeletionVectorsMaintainer maintainer; + + BucketedAppendDeletionFileMaintainer( + BinaryRow partition, int bucket, DeletionVectorsMaintainer maintainer) { + this.partition = partition; + this.bucket = bucket; + this.maintainer = maintainer; + } + + @Override + public BinaryRow getPartition() { + return this.partition; + } + + @Override + public int getBucket() { + return this.bucket; + } + + @Override + public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) { + maintainer.mergeNewDeletion(dataFile, deletionVector); + } + + @Override + public List persist() { + return maintainer.writeDeletionVectorsIndex().stream() + .map(fileMeta -> new IndexManifestEntry(FileKind.ADD, partition, bucket, fileMeta)) + .collect(Collectors.toList()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java similarity index 74% rename from paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java rename to paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java index 39a0c75921ff..de6baac9faa8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.paimon.deletionvectors; +package org.apache.paimon.deletionvectors.append; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; @@ -35,13 +38,14 @@ import java.util.Set; import java.util.stream.Collectors; -/** DeletionVectorIndexFileMaintainer. */ -public class DeletionVectorIndexFileMaintainer { +import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; + +/** A {@link AppendDeletionFileMaintainer} of unaware bucket append table. */ +public class UnawareAppendDeletionFileMaintainer implements AppendDeletionFileMaintainer { private final IndexFileHandler indexFileHandler; private final BinaryRow partition; - private final int bucket; private final Map indexNameToEntry = new HashMap<>(); private final Map> indexFileToDeletionFiles = new HashMap<>(); @@ -51,26 +55,16 @@ public class DeletionVectorIndexFileMaintainer { private final DeletionVectorsMaintainer maintainer; - // the key of dataFileToDeletionFiles is the relative path again table's location. - public DeletionVectorIndexFileMaintainer( + UnawareAppendDeletionFileMaintainer( IndexFileHandler indexFileHandler, - Long snapshotId, BinaryRow partition, - int bucket, - boolean restore) { + Map deletionFiles) { this.indexFileHandler = indexFileHandler; this.partition = partition; - this.bucket = bucket; - if (restore) { - this.maintainer = - new DeletionVectorsMaintainer.Factory(indexFileHandler) - .createOrRestore(snapshotId, partition, bucket); - } else { - this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create(); - } - Map dataFileToDeletionFiles = - indexFileHandler.scanDVIndex(snapshotId, partition, bucket); - init(dataFileToDeletionFiles); + // the deletion of data files is independent + // just create an empty maintainer + this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create(); + init(deletionFiles); } @VisibleForTesting @@ -98,14 +92,17 @@ public void init(Map dataFileToDeletionFiles) { } } + @Override public BinaryRow getPartition() { return this.partition; } + @Override public int getBucket() { - return this.bucket; + return UNAWARE_BUCKET; } + @Override public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) { DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); DeletionFile previous = null; @@ -122,17 +119,7 @@ public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) maintainer.notifyNewDeletion(dataFile, deletionVector); } - public void notifyDeletionFiles(Map dataFileToDeletionFiles) { - for (String dataFile : dataFileToDeletionFiles.keySet()) { - DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile); - String indexFileName = new Path(deletionFile.path()).getName(); - touchedIndexFiles.add(indexFileName); - if (indexFileToDeletionFiles.containsKey(indexFileName)) { - indexFileToDeletionFiles.get(indexFileName).remove(dataFile); - } - } - } - + @Override public List persist() { List result = writeUnchangedDeletionVector(); List newIndexFileEntries = @@ -140,13 +127,14 @@ public List persist() { .map( fileMeta -> new IndexManifestEntry( - FileKind.ADD, partition, bucket, fileMeta)) + FileKind.ADD, partition, UNAWARE_BUCKET, fileMeta)) .collect(Collectors.toList()); result.addAll(newIndexFileEntries); return result; } - public List writeUnchangedDeletionVector() { + @VisibleForTesting + List writeUnchangedDeletionVector() { DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); List newIndexEntries = new ArrayList<>(); for (String indexFile : indexFileToDeletionFiles.keySet()) { @@ -162,14 +150,13 @@ public List writeUnchangedDeletionVector() { deletionVectorsIndexFile.readDeletionVector( dataFileToDeletionFiles)); newIndexFiles.forEach( - newIndexFile -> { - newIndexEntries.add( - new IndexManifestEntry( - FileKind.ADD, - oldEntry.partition(), - oldEntry.bucket(), - newIndexFile)); - }); + newIndexFile -> + newIndexEntries.add( + new IndexManifestEntry( + FileKind.ADD, + oldEntry.partition(), + oldEntry.bucket(), + newIndexFile))); } // mark the touched index file as removed. diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index ee6cbe769c8d..bdf47b16abb5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -21,7 +21,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; @@ -271,11 +270,6 @@ public void deleteManifest(String indexManifest) { indexManifestFile.delete(indexManifest); } - public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer( - Long snapshotId, BinaryRow partition, int bucket, boolean restore) { - return new DeletionVectorIndexFileMaintainer(this, snapshotId, partition, bucket, restore); - } - public Map readAllDeletionVectors(List fileMetas) { for (IndexFileMeta indexFile : fileMetas) { checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java index 2fcc8822a70d..c3b7ca1abd1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java @@ -57,5 +57,7 @@ public enum BucketMode { * Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads * and writes is unrestricted. This mode only works for append-only table. */ - BUCKET_UNAWARE + BUCKET_UNAWARE; + + public static final int UNAWARE_BUCKET = 0; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java index e99133899675..ad8a3cd83006 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.BucketMode; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -37,6 +38,6 @@ public UnawareBucketRowKeyExtractor(TableSchema schema) { @Override public int bucket() { - return 0; + return BucketMode.UNAWARE_BUCKET; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index c86b1cb404ce..a68779226dd1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -19,8 +19,9 @@ package org.apache.paimon; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer; +import org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -118,10 +119,11 @@ public List scanDVIndexFiles(BinaryRow partition, int bucket) { return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket); } - public DeletionVectorIndexFileMaintainer createDVIFMaintainer( - BinaryRow partition, int bucket, Map dataFileToDeletionFiles) { - DeletionVectorIndexFileMaintainer maintainer = - new DeletionVectorIndexFileMaintainer(fileHandler, null, partition, bucket, false); + public UnawareAppendDeletionFileMaintainer createDVIFMaintainer( + BinaryRow partition, Map dataFileToDeletionFiles) { + UnawareAppendDeletionFileMaintainer maintainer = + (UnawareAppendDeletionFileMaintainer) + AppendDeletionFileMaintainer.forUnawareAppend(fileHandler, null, partition); maintainer.init(dataFileToDeletionFiles); return maintainer; } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java similarity index 91% rename from paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java rename to paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java index f78e39dfbb67..2ebc30cf9273 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.paimon.deletionvectors; +package org.apache.paimon.deletionvectors.append; import org.apache.paimon.TestAppendFileStore; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; @@ -39,8 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Test for DeletionVectorIndexFileMaintainer. */ -public class DeletionVectorIndexFileMaintainerTest { +class AppendDeletionFileMaintainerTest { @TempDir java.nio.file.Path tempDir; @@ -68,8 +70,8 @@ public void test() throws Exception { createDeletionFileMapFromIndexFileMetas( indexPathFactory, commitMessage2.indexIncrement().newIndexFiles())); - DeletionVectorIndexFileMaintainer dvIFMaintainer = - store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, dataFileToDeletionFiles); + UnawareAppendDeletionFileMaintainer dvIFMaintainer = + store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, dataFileToDeletionFiles); // no dv should be rewritten, because nothing is changed. List res = dvIFMaintainer.writeUnchangedDeletionVector(); @@ -77,8 +79,9 @@ public void test() throws Exception { // the dv of f3 is updated, and the index file that contains the dv of f3 should be marked // as REMOVE. + FileIO fileIO = LocalFileIO.create(); dvIFMaintainer.notifyDeletionFiles( - Collections.singletonMap("f3", dataFileToDeletionFiles.get("f3"))); + "f3", DeletionVector.read(fileIO, dataFileToDeletionFiles.get("f3"))); res = dvIFMaintainer.writeUnchangedDeletionVector(); assertThat(res.size()).isEqualTo(1); assertThat(res.get(0).kind()).isEqualTo(FileKind.DELETE); @@ -86,7 +89,7 @@ public void test() throws Exception { // the dv of f1 and f2 are in one index file, and the dv of f1 is updated. // the dv of f2 need to be rewritten, and this index file should be marked as REMOVE. dvIFMaintainer.notifyDeletionFiles( - Collections.singletonMap("f1", dataFileToDeletionFiles.get("f1"))); + "f1", DeletionVector.read(fileIO, dataFileToDeletionFiles.get("f1"))); res = dvIFMaintainer.writeUnchangedDeletionVector(); assertThat(res.size()).isEqualTo(3); IndexManifestEntry entry = diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index d12909b8a173..7bdd0ce60da2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -19,7 +19,8 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.WRITE_ONLY -import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorIndexFileMaintainer} +import org.apache.paimon.deletionvectors.DeletionVector +import org.apache.paimon.deletionvectors.append.{AppendDeletionFileMaintainer, UnawareAppendDeletionFileMaintainer} import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner} import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement} import org.apache.paimon.manifest.{FileKind, IndexManifestEntry} @@ -27,6 +28,7 @@ import org.apache.paimon.spark.{SparkRow, SparkTableWrite} import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL} import org.apache.paimon.spark.util.SparkRowUtils import org.apache.paimon.table.{BucketMode, FileStoreTable} +import org.apache.paimon.table.BucketMode.BUCKET_UNAWARE import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor} import org.apache.paimon.utils.SerializationUtils @@ -179,7 +181,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { numAssigners, encoderGroupWithBucketCol)) } - case BucketMode.BUCKET_UNAWARE => + case BUCKET_UNAWARE => // Topology: input -> writeWithoutBucket() case BucketMode.HASH_FIXED => @@ -211,17 +213,20 @@ case class PaimonSparkWriter(table: FileStoreTable) { .mapGroups { case (_, iter: Iterator[SparkDeletionVectors]) => val indexHandler = table.store().newIndexFileHandler() - var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null + var dvIndexFileMaintainer: AppendDeletionFileMaintainer = null while (iter.hasNext) { val sdv: SparkDeletionVectors = iter.next() if (dvIndexFileMaintainer == null) { val partition = SerializationUtils.deserializeBinaryRow(sdv.partition) - dvIndexFileMaintainer = indexHandler - .createDVIndexFileMaintainer( + dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) { + AppendDeletionFileMaintainer.forUnawareAppend(indexHandler, snapshotId, partition) + } else { + AppendDeletionFileMaintainer.forBucketedAppend( + indexHandler, snapshotId, partition, - sdv.bucket, - bucketMode != BucketMode.BUCKET_UNAWARE) + sdv.bucket) + } } if (dvIndexFileMaintainer == null) { throw new RuntimeException("can't create the dv maintainer.")