Skip to content

Commit

Permalink
[core] Refactor AppendDeletionFileMaintainer to separate bucketed and…
Browse files Browse the repository at this point in the history
… unaware (#3950)
  • Loading branch information
JingsongLi authored Aug 13, 2024
1 parent 037dc54 commit c4762cc
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ public void notifyNewDeletion(String fileName, DeletionVector deletionVector) {
modified = true;
}

/**
* Merge a new deletion which marks the specified deletion vector with the given file name, if
* the previous deletion vector exist, merge the old one.
*
* @param fileName The name of the file where the deletion occurred.
* @param deletionVector The deletion vector
*/
public void mergeNewDeletion(String fileName, DeletionVector deletionVector) {
DeletionVector old = deletionVectors.get(fileName);
if (old != null) {
deletionVector.merge(old);
}
deletionVectors.put(fileName, deletionVector);
modified = true;
}

/**
* Removes the specified file's deletion vector, this method is typically used for remove before
* files' deletion vector in compaction.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.deletionvectors.append;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.source.DeletionFile;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;

/**
* A maintainer to maintain deletion files for append table, the core methods:
*
* <ul>
* <li>{@link #notifyDeletionFiles}: Mark the deletion of data files, create new deletion vectors.
* <li>{@link #persist}: persist deletion files to commit.
* </ul>
*/
public interface AppendDeletionFileMaintainer {

BinaryRow getPartition();

int getBucket();

void notifyDeletionFiles(String dataFile, DeletionVector deletionVector);

List<IndexManifestEntry> persist();

static AppendDeletionFileMaintainer forBucketedAppend(
IndexFileHandler indexFileHandler,
@Nullable Long snapshotId,
BinaryRow partition,
int bucket) {
// bucket should have only one deletion file, so here we should read old deletion vectors,
// overwrite the entire deletion file of the bucket when writing deletes.
DeletionVectorsMaintainer maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
return new BucketedAppendDeletionFileMaintainer(partition, bucket, maintainer);
}

static AppendDeletionFileMaintainer forUnawareAppend(
IndexFileHandler indexFileHandler, @Nullable Long snapshotId, BinaryRow partition) {
Map<String, DeletionFile> deletionFiles =
indexFileHandler.scanDVIndex(snapshotId, partition, UNAWARE_BUCKET);
return new UnawareAppendDeletionFileMaintainer(indexFileHandler, partition, deletionFiles);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.deletionvectors.append;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;

import java.util.List;
import java.util.stream.Collectors;

/** A {@link AppendDeletionFileMaintainer} of bucketed append table. */
public class BucketedAppendDeletionFileMaintainer implements AppendDeletionFileMaintainer {

private final BinaryRow partition;
private final int bucket;
private final DeletionVectorsMaintainer maintainer;

BucketedAppendDeletionFileMaintainer(
BinaryRow partition, int bucket, DeletionVectorsMaintainer maintainer) {
this.partition = partition;
this.bucket = bucket;
this.maintainer = maintainer;
}

@Override
public BinaryRow getPartition() {
return this.partition;
}

@Override
public int getBucket() {
return this.bucket;
}

@Override
public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) {
maintainer.mergeNewDeletion(dataFile, deletionVector);
}

@Override
public List<IndexManifestEntry> persist() {
return maintainer.writeDeletionVectorsIndex().stream()
.map(fileMeta -> new IndexManifestEntry(FileKind.ADD, partition, bucket, fileMeta))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
* limitations under the License.
*/

package org.apache.paimon.deletionvectors;
package org.apache.paimon.deletionvectors.append;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
Expand All @@ -35,13 +38,14 @@
import java.util.Set;
import java.util.stream.Collectors;

/** DeletionVectorIndexFileMaintainer. */
public class DeletionVectorIndexFileMaintainer {
import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;

/** A {@link AppendDeletionFileMaintainer} of unaware bucket append table. */
public class UnawareAppendDeletionFileMaintainer implements AppendDeletionFileMaintainer {

private final IndexFileHandler indexFileHandler;

private final BinaryRow partition;
private final int bucket;
private final Map<String, IndexManifestEntry> indexNameToEntry = new HashMap<>();

private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles = new HashMap<>();
Expand All @@ -51,26 +55,16 @@ public class DeletionVectorIndexFileMaintainer {

private final DeletionVectorsMaintainer maintainer;

// the key of dataFileToDeletionFiles is the relative path again table's location.
public DeletionVectorIndexFileMaintainer(
UnawareAppendDeletionFileMaintainer(
IndexFileHandler indexFileHandler,
Long snapshotId,
BinaryRow partition,
int bucket,
boolean restore) {
Map<String, DeletionFile> deletionFiles) {
this.indexFileHandler = indexFileHandler;
this.partition = partition;
this.bucket = bucket;
if (restore) {
this.maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
} else {
this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create();
}
Map<String, DeletionFile> dataFileToDeletionFiles =
indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
init(dataFileToDeletionFiles);
// the deletion of data files is independent
// just create an empty maintainer
this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create();
init(deletionFiles);
}

@VisibleForTesting
Expand Down Expand Up @@ -98,14 +92,17 @@ public void init(Map<String, DeletionFile> dataFileToDeletionFiles) {
}
}

@Override
public BinaryRow getPartition() {
return this.partition;
}

@Override
public int getBucket() {
return this.bucket;
return UNAWARE_BUCKET;
}

@Override
public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
DeletionFile previous = null;
Expand All @@ -122,31 +119,22 @@ public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector)
maintainer.notifyNewDeletion(dataFile, deletionVector);
}

public void notifyDeletionFiles(Map<String, DeletionFile> dataFileToDeletionFiles) {
for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
String indexFileName = new Path(deletionFile.path()).getName();
touchedIndexFiles.add(indexFileName);
if (indexFileToDeletionFiles.containsKey(indexFileName)) {
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
}
}
}

@Override
public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = writeUnchangedDeletionVector();
List<IndexManifestEntry> newIndexFileEntries =
maintainer.writeDeletionVectorsIndex().stream()
.map(
fileMeta ->
new IndexManifestEntry(
FileKind.ADD, partition, bucket, fileMeta))
FileKind.ADD, partition, UNAWARE_BUCKET, fileMeta))
.collect(Collectors.toList());
result.addAll(newIndexFileEntries);
return result;
}

public List<IndexManifestEntry> writeUnchangedDeletionVector() {
@VisibleForTesting
List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
for (String indexFile : indexFileToDeletionFiles.keySet()) {
Expand All @@ -162,14 +150,13 @@ public List<IndexManifestEntry> writeUnchangedDeletionVector() {
deletionVectorsIndexFile.readDeletionVector(
dataFileToDeletionFiles));
newIndexFiles.forEach(
newIndexFile -> {
newIndexEntries.add(
new IndexManifestEntry(
FileKind.ADD,
oldEntry.partition(),
oldEntry.bucket(),
newIndexFile));
});
newIndexFile ->
newIndexEntries.add(
new IndexManifestEntry(
FileKind.ADD,
oldEntry.partition(),
oldEntry.bucket(),
newIndexFile)));
}

// mark the touched index file as removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
Expand Down Expand Up @@ -271,11 +270,6 @@ public void deleteManifest(String indexManifest) {
indexManifestFile.delete(indexManifest);
}

public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
Long snapshotId, BinaryRow partition, int bucket, boolean restore) {
return new DeletionVectorIndexFileMaintainer(this, snapshotId, partition, bucket, restore);
}

public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
for (IndexFileMeta indexFile : fileMetas) {
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,7 @@ public enum BucketMode {
* Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads
* and writes is unrestricted. This mode only works for append-only table.
*/
BUCKET_UNAWARE
BUCKET_UNAWARE;

public static final int UNAWARE_BUCKET = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -37,6 +38,6 @@ public UnawareBucketRowKeyExtractor(TableSchema schema) {

@Override
public int bucket() {
return 0;
return BucketMode.UNAWARE_BUCKET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.paimon;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
import org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -118,10 +119,11 @@ public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int bucket) {
return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket);
}

public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
BinaryRow partition, int bucket, Map<String, DeletionFile> dataFileToDeletionFiles) {
DeletionVectorIndexFileMaintainer maintainer =
new DeletionVectorIndexFileMaintainer(fileHandler, null, partition, bucket, false);
public UnawareAppendDeletionFileMaintainer createDVIFMaintainer(
BinaryRow partition, Map<String, DeletionFile> dataFileToDeletionFiles) {
UnawareAppendDeletionFileMaintainer maintainer =
(UnawareAppendDeletionFileMaintainer)
AppendDeletionFileMaintainer.forUnawareAppend(fileHandler, null, partition);
maintainer.init(dataFileToDeletionFiles);
return maintainer;
}
Expand Down
Loading

0 comments on commit c4762cc

Please sign in to comment.