Skip to content

Commit

Permalink
[core] Introduce DeletionVectorIndexFileMaintainer (#3387)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored May 24, 2024
1 parent 9cb2909 commit c0a5f22
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.deletionvectors;

import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.source.DeletionFile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/** DeletionVectorIndexFileMaintainer. */
public class DeletionVectorIndexFileMaintainer {

private final IndexFileHandler indexFileHandler;

private final Map<String, IndexManifestEntry> indexNameToEntry = new HashMap<>();

private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles = new HashMap<>();

private final Set<String> touchedIndexFiles = new HashSet<>();

public DeletionVectorIndexFileMaintainer(
IndexFileHandler indexFileHandler, Map<String, DeletionFile> dataFileToDeletionFiles) {
this.indexFileHandler = indexFileHandler;
List<String> touchedIndexFileNames =
dataFileToDeletionFiles.values().stream()
.map(deletionFile -> new Path(deletionFile.path()).getName())
.distinct()
.collect(Collectors.toList());
indexFileHandler.scan().stream()
.filter(
indexManifestEntry ->
touchedIndexFileNames.contains(
indexManifestEntry.indexFile().fileName()))
.forEach(entry -> indexNameToEntry.put(entry.indexFile().fileName(), entry));

for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
String indexFileName = new Path(deletionFile.path()).getName();
if (!indexFileToDeletionFiles.containsKey(indexFileName)) {
indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
}
indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile);
}
}

public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFiles) {
for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
String indexFileName = new Path(deletionFile.path()).getName();
touchedIndexFiles.add(indexFileName);
if (indexFileToDeletionFiles.containsKey(indexFileName)) {
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
if (indexFileToDeletionFiles.get(indexFileName).isEmpty()) {
indexFileToDeletionFiles.remove(indexFileName);
indexNameToEntry.remove(indexFileName);
}
}
}
}

public List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
for (String indexFile : indexFileToDeletionFiles.keySet()) {
if (touchedIndexFiles.contains(indexFile)) {
IndexManifestEntry oldEntry = indexNameToEntry.get(indexFile);

// write unchanged deletion vector.
Map<String, DeletionFile> dataFileToDeletionFiles =
indexFileToDeletionFiles.get(indexFile);
if (!dataFileToDeletionFiles.isEmpty()) {
IndexFileMeta newIndexFile =
indexFileHandler.writeDeletionVectorsIndex(
deletionVectorsIndexFile.readDeletionVector(
dataFileToDeletionFiles));
newIndexEntries.add(
new IndexManifestEntry(
FileKind.ADD,
oldEntry.partition(),
oldEntry.bucket(),
newIndexFile));
}

// mark the touched index file as removed.
newIndexEntries.add(oldEntry.toDeleteEntry());
}
}
return newIndexEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;

Expand All @@ -36,6 +37,7 @@
import java.util.Map;
import java.util.zip.CRC32;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** DeletionVectors index file. */
Expand Down Expand Up @@ -83,6 +85,31 @@ public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta fileMeta
return deletionVectors;
}

/** Reads deletion vectors from a list of DeletionFile which belong to a same index file. */
public Map<String, DeletionVector> readDeletionVector(
Map<String, DeletionFile> dataFileToDeletionFiles) {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
if (dataFileToDeletionFiles.isEmpty()) {
return deletionVectors;
}

String indexFile = dataFileToDeletionFiles.values().stream().findAny().get().path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new Path(indexFile))) {
checkVersion(inputStream);
for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new DataInputStream(inputStream);
deletionVectors.put(
dataFile, readDeletionVector(dataInputStream, (int) deletionFile.length()));
}
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from file: " + indexFile, e);
}
return deletionVectors;
}

/**
* Reads a single deletion vector from the specified file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ public IndexFileHandler(
this.deletionVectorsIndex = deletionVectorsIndex;
}

public DeletionVectorsIndexFile deletionVectorsIndex() {
return this.deletionVectorsIndex;
}

public List<IndexManifestEntry> scan() {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null || snapshot.indexManifest() == null) {
return Collections.emptyList();
}
return indexManifestFile.read(snapshot.indexManifest());
}

public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, bucket);
if (result.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
Expand All @@ -39,6 +40,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;

Expand Down Expand Up @@ -117,14 +119,19 @@ public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int bucket) {
return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket);
}

public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
Map<String, DeletionFile> dataFileToDeletionFiles) {
return new DeletionVectorIndexFileMaintainer(fileHandler, dataFileToDeletionFiles);
}

public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) {
Long lastSnapshotId = snapshotManager().latestSnapshotId();
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
return factory.createOrRestore(lastSnapshotId, partition, bucket);
}

public CommitMessage writeDVIndexFiles(
public CommitMessageImpl writeDVIndexFiles(
BinaryRow partition, int bucket, Map<String, List<Integer>> dataFileToPositions) {
DeletionVectorsMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket);
for (Map.Entry<String, List<Integer>> entry : dataFileToPositions.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.deletionvectors;

import org.apache.paimon.TestAppendFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for DeletionVectorIndexFileMaintainer. */
public class DeletionVectorIndexFileMaintainerTest {

@TempDir java.nio.file.Path tempDir;

@Test
public void test() throws Exception {
TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());

Map<String, List<Integer>> dvs = new HashMap<>();
dvs.put("f1", Arrays.asList(1, 3, 5));
dvs.put("f2", Arrays.asList(2, 4, 6));
CommitMessageImpl commitMessage1 = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs);
CommitMessageImpl commitMessage2 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW,
1,
Collections.singletonMap("f3", Arrays.asList(1, 2, 3)));
store.commit(commitMessage1, commitMessage2);

PathFactory indexPathFactory = store.pathFactory().indexFileFactory();
Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
dataFileToDeletionFiles.putAll(
createDeletionFileMapFromIndexFileMetas(
indexPathFactory, commitMessage1.indexIncrement().newIndexFiles()));
dataFileToDeletionFiles.putAll(
createDeletionFileMapFromIndexFileMetas(
indexPathFactory, commitMessage2.indexIncrement().newIndexFiles()));

DeletionVectorIndexFileMaintainer dvIFMaintainer =
store.createDVIFMaintainer(dataFileToDeletionFiles);

// no dv should be rewritten, because nothing is changed.
List<IndexManifestEntry> res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(0);

// no dv should be rewritten, because all the deletion vectors have been updated in the
// index file that contains the dv of f3.
dvIFMaintainer.notifyDeletionFiles(
Collections.singletonMap("f3", dataFileToDeletionFiles.get("f3")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(0);

// the dv of f1 and f2 are in one index file, and the dv of f1 is updated.
// the dv of f2 need to be rewritten, and this index file should be marked as REMOVE.
dvIFMaintainer.notifyDeletionFiles(
Collections.singletonMap("f1", dataFileToDeletionFiles.get("f1")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(2);
IndexManifestEntry entry =
res.stream().filter(file -> file.kind() == FileKind.ADD).findAny().get();
assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue();
entry = res.stream().filter(file -> file.kind() == FileKind.DELETE).findAny().get();
assertThat(entry.indexFile())
.isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0));
}

private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas(
PathFactory indexPathFactory, List<IndexFileMeta> fileMetas) {
Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
for (IndexFileMeta indexFileMeta : fileMetas) {
for (Map.Entry<String, Pair<Integer, Integer>> range :
indexFileMeta.deletionVectorsRanges().entrySet()) {
dataFileToDeletionFiles.put(
range.getKey(),
new DeletionFile(
indexPathFactory.toPath(indexFileMeta.fileName()).toString(),
range.getValue().getLeft(),
range.getValue().getRight()));
}
}
return dataFileToDeletionFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,12 @@ public void testDVIndexFiles() throws Exception {
TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());

// commit 1
CommitMessage commitMessage1 =
CommitMessageImpl commitMessage1 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW,
0,
Collections.singletonMap("f1", Arrays.asList(1, 3)));
CommitMessage commitMessage2 =
CommitMessageImpl commitMessage2 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW,
0,
Expand All @@ -880,18 +880,18 @@ public void testDVIndexFiles() throws Exception {
CommitMessage commitMessage3 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3)));
CommitMessage commitMessage4 =
store.removeIndexFiles(
BinaryRow.EMPTY_ROW,
0,
((CommitMessageImpl) commitMessage1).indexIncrement().newIndexFiles());
List<IndexFileMeta> deleted =
new ArrayList<>(commitMessage1.indexIncrement().newIndexFiles());
deleted.addAll(commitMessage2.indexIncrement().newIndexFiles());
CommitMessage commitMessage4 = store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
store.commit(commitMessage3, commitMessage4);

// assert 2
assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2);
assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(1);
maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
dvs = maintainer.deletionVectors();
assertThat(dvs.size()).isEqualTo(2);
assertThat(dvs.get("f1").isDeleted(3)).isTrue();
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}

Expand Down

0 comments on commit c0a5f22

Please sign in to comment.