diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java new file mode 100644 index 000000000000..3daca576b5d9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.deletionvectors; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.source.DeletionFile; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** DeletionVectorIndexFileMaintainer. */ +public class DeletionVectorIndexFileMaintainer { + + private final IndexFileHandler indexFileHandler; + + private final Map indexNameToEntry = new HashMap<>(); + + private final Map> indexFileToDeletionFiles = new HashMap<>(); + + private final Set touchedIndexFiles = new HashSet<>(); + + public DeletionVectorIndexFileMaintainer( + IndexFileHandler indexFileHandler, Map dataFileToDeletionFiles) { + this.indexFileHandler = indexFileHandler; + List touchedIndexFileNames = + dataFileToDeletionFiles.values().stream() + .map(deletionFile -> new Path(deletionFile.path()).getName()) + .distinct() + .collect(Collectors.toList()); + indexFileHandler.scan().stream() + .filter( + indexManifestEntry -> + touchedIndexFileNames.contains( + indexManifestEntry.indexFile().fileName())) + .forEach(entry -> indexNameToEntry.put(entry.indexFile().fileName(), entry)); + + for (String dataFile : dataFileToDeletionFiles.keySet()) { + DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile); + String indexFileName = new Path(deletionFile.path()).getName(); + if (!indexFileToDeletionFiles.containsKey(indexFileName)) { + indexFileToDeletionFiles.put(indexFileName, new HashMap<>()); + } + indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile); + } + } + + public void notifyDeletionFiles(Map dataFileToDeletionFiles) { + for (String dataFile : dataFileToDeletionFiles.keySet()) { + DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile); + String indexFileName = new Path(deletionFile.path()).getName(); + touchedIndexFiles.add(indexFileName); + if (indexFileToDeletionFiles.containsKey(indexFileName)) { + indexFileToDeletionFiles.get(indexFileName).remove(dataFile); + if (indexFileToDeletionFiles.get(indexFileName).isEmpty()) { + indexFileToDeletionFiles.remove(indexFileName); + indexNameToEntry.remove(indexFileName); + } + } + } + } + + public List writeUnchangedDeletionVector() { + DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); + List newIndexEntries = new ArrayList<>(); + for (String indexFile : indexFileToDeletionFiles.keySet()) { + if (touchedIndexFiles.contains(indexFile)) { + IndexManifestEntry oldEntry = indexNameToEntry.get(indexFile); + + // write unchanged deletion vector. + Map dataFileToDeletionFiles = + indexFileToDeletionFiles.get(indexFile); + if (!dataFileToDeletionFiles.isEmpty()) { + IndexFileMeta newIndexFile = + indexFileHandler.writeDeletionVectorsIndex( + deletionVectorsIndexFile.readDeletionVector( + dataFileToDeletionFiles)); + newIndexEntries.add( + new IndexManifestEntry( + FileKind.ADD, + oldEntry.partition(), + oldEntry.bucket(), + newIndexFile)); + } + + // mark the touched index file as removed. + newIndexEntries.add(oldEntry.toDeleteEntry()); + } + } + return newIndexEntries; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index 3d186cee2e2a..0207250d1e6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.index.IndexFile; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; @@ -36,6 +37,7 @@ import java.util.Map; import java.util.zip.CRC32; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** DeletionVectors index file. */ @@ -83,6 +85,31 @@ public Map readAllDeletionVectors(IndexFileMeta fileMeta return deletionVectors; } + /** Reads deletion vectors from a list of DeletionFile which belong to a same index file. */ + public Map readDeletionVector( + Map dataFileToDeletionFiles) { + Map deletionVectors = new HashMap<>(); + if (dataFileToDeletionFiles.isEmpty()) { + return deletionVectors; + } + + String indexFile = dataFileToDeletionFiles.values().stream().findAny().get().path(); + try (SeekableInputStream inputStream = fileIO.newInputStream(new Path(indexFile))) { + checkVersion(inputStream); + for (String dataFile : dataFileToDeletionFiles.keySet()) { + DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile); + checkArgument(deletionFile.path().equals(indexFile)); + inputStream.seek(deletionFile.offset()); + DataInputStream dataInputStream = new DataInputStream(inputStream); + deletionVectors.put( + dataFile, readDeletionVector(dataInputStream, (int) deletionFile.length())); + } + } catch (Exception e) { + throw new RuntimeException("Unable to read deletion vector from file: " + indexFile, e); + } + return deletionVectors; + } + /** * Reads a single deletion vector from the specified file. * diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index f3138cb02ba5..2d3dee20e01e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -65,6 +65,18 @@ public IndexFileHandler( this.deletionVectorsIndex = deletionVectorsIndex; } + public DeletionVectorsIndexFile deletionVectorsIndex() { + return this.deletionVectorsIndex; + } + + public List scan() { + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null || snapshot.indexManifest() == null) { + return Collections.emptyList(); + } + return indexManifestFile.read(snapshot.indexManifest()); + } + public Optional scanHashIndex(long snapshotId, BinaryRow partition, int bucket) { List result = scan(snapshotId, HASH_INDEX, partition, bucket); if (result.size() > 1) { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 11612ac1bb48..fd9f784734f2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -19,6 +19,7 @@ package org.apache.paimon; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; @@ -39,6 +40,7 @@ import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.TraceableFileIO; @@ -117,6 +119,11 @@ public List scanDVIndexFiles(BinaryRow partition, int bucket) { return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket); } + public DeletionVectorIndexFileMaintainer createDVIFMaintainer( + Map dataFileToDeletionFiles) { + return new DeletionVectorIndexFileMaintainer(fileHandler, dataFileToDeletionFiles); + } + public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { Long lastSnapshotId = snapshotManager().latestSnapshotId(); DeletionVectorsMaintainer.Factory factory = @@ -124,7 +131,7 @@ public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition return factory.createOrRestore(lastSnapshotId, partition, bucket); } - public CommitMessage writeDVIndexFiles( + public CommitMessageImpl writeDVIndexFiles( BinaryRow partition, int bucket, Map> dataFileToPositions) { DeletionVectorsMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); for (Map.Entry> entry : dataFileToPositions.entrySet()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java new file mode 100644 index 000000000000..5da65c7821cd --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.deletionvectors; + +import org.apache.paimon.TestAppendFileStore; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.PathFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for DeletionVectorIndexFileMaintainer. */ +public class DeletionVectorIndexFileMaintainerTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void test() throws Exception { + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + + Map> dvs = new HashMap<>(); + dvs.put("f1", Arrays.asList(1, 3, 5)); + dvs.put("f2", Arrays.asList(2, 4, 6)); + CommitMessageImpl commitMessage1 = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs); + CommitMessageImpl commitMessage2 = + store.writeDVIndexFiles( + BinaryRow.EMPTY_ROW, + 1, + Collections.singletonMap("f3", Arrays.asList(1, 2, 3))); + store.commit(commitMessage1, commitMessage2); + + PathFactory indexPathFactory = store.pathFactory().indexFileFactory(); + Map dataFileToDeletionFiles = new HashMap<>(); + dataFileToDeletionFiles.putAll( + createDeletionFileMapFromIndexFileMetas( + indexPathFactory, commitMessage1.indexIncrement().newIndexFiles())); + dataFileToDeletionFiles.putAll( + createDeletionFileMapFromIndexFileMetas( + indexPathFactory, commitMessage2.indexIncrement().newIndexFiles())); + + DeletionVectorIndexFileMaintainer dvIFMaintainer = + store.createDVIFMaintainer(dataFileToDeletionFiles); + + // no dv should be rewritten, because nothing is changed. + List res = dvIFMaintainer.writeUnchangedDeletionVector(); + assertThat(res.size()).isEqualTo(0); + + // no dv should be rewritten, because all the deletion vectors have been updated in the + // index file that contains the dv of f3. + dvIFMaintainer.notifyDeletionFiles( + Collections.singletonMap("f3", dataFileToDeletionFiles.get("f3"))); + res = dvIFMaintainer.writeUnchangedDeletionVector(); + assertThat(res.size()).isEqualTo(0); + + // the dv of f1 and f2 are in one index file, and the dv of f1 is updated. + // the dv of f2 need to be rewritten, and this index file should be marked as REMOVE. + dvIFMaintainer.notifyDeletionFiles( + Collections.singletonMap("f1", dataFileToDeletionFiles.get("f1"))); + res = dvIFMaintainer.writeUnchangedDeletionVector(); + assertThat(res.size()).isEqualTo(2); + IndexManifestEntry entry = + res.stream().filter(file -> file.kind() == FileKind.ADD).findAny().get(); + assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue(); + entry = res.stream().filter(file -> file.kind() == FileKind.DELETE).findAny().get(); + assertThat(entry.indexFile()) + .isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0)); + } + + private Map createDeletionFileMapFromIndexFileMetas( + PathFactory indexPathFactory, List fileMetas) { + Map dataFileToDeletionFiles = new HashMap<>(); + for (IndexFileMeta indexFileMeta : fileMetas) { + for (Map.Entry> range : + indexFileMeta.deletionVectorsRanges().entrySet()) { + dataFileToDeletionFiles.put( + range.getKey(), + new DeletionFile( + indexPathFactory.toPath(indexFileMeta.fileName()).toString(), + range.getValue().getLeft(), + range.getValue().getRight())); + } + } + return dataFileToDeletionFiles; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 03630e71fd2f..7953bd6c6103 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -854,12 +854,12 @@ public void testDVIndexFiles() throws Exception { TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); // commit 1 - CommitMessage commitMessage1 = + CommitMessageImpl commitMessage1 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f1", Arrays.asList(1, 3))); - CommitMessage commitMessage2 = + CommitMessageImpl commitMessage2 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, 0, @@ -880,18 +880,18 @@ public void testDVIndexFiles() throws Exception { CommitMessage commitMessage3 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3))); - CommitMessage commitMessage4 = - store.removeIndexFiles( - BinaryRow.EMPTY_ROW, - 0, - ((CommitMessageImpl) commitMessage1).indexIncrement().newIndexFiles()); + List deleted = + new ArrayList<>(commitMessage1.indexIncrement().newIndexFiles()); + deleted.addAll(commitMessage2.indexIncrement().newIndexFiles()); + CommitMessage commitMessage4 = store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted); store.commit(commitMessage3, commitMessage4); // assert 2 - assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); + assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(1); maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); dvs = maintainer.deletionVectors(); assertThat(dvs.size()).isEqualTo(2); + assertThat(dvs.get("f1").isDeleted(3)).isTrue(); assertThat(dvs.get("f2").isDeleted(3)).isTrue(); }