Skip to content

Commit

Permalink
[core] Integrate deletion vector to reader and writer (#2958)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 9, 2024
1 parent e7d3da5 commit dfb5278
Show file tree
Hide file tree
Showing 33 changed files with 937 additions and 171 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@
<td>Boolean</td>
<td>Whether to ignore delete records in deduplicate mode.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
24 changes: 24 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -1075,6 +1076,15 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to force create snapshot on commit.");

public static final ConfigOption<Boolean> DELETION_VECTORS_ENABLED =
key("deletion-vectors.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable deletion vectors mode. In this mode, index files containing deletion"
+ " vectors are generated when data is written, which marks the data for deletion."
+ " During read operations, by applying these index files, merging can be avoided.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1377,6 +1387,16 @@ public ChangelogProducer changelogProducer() {
return options.get(CHANGELOG_PRODUCER);
}

public boolean needLookup() {
return lookupStrategy().needLookup;
}

public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP),
deletionVectorsEnabled());
}

public boolean changelogRowDeduplicate() {
return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
}
Expand Down Expand Up @@ -1634,6 +1654,10 @@ public int varTypeSize() {
return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
}

public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lookup;

/** Strategy for lookup. */
public enum LookupStrategy {
NO_LOOKUP(false, false),

CHANGELOG_ONLY(true, false),

DELETION_VECTOR_ONLY(false, true),

CHANGELOG_AND_DELETION_VECTOR(true, true);

public final boolean needLookup;

public final boolean produceChangelog;

public final boolean deletionVector;

LookupStrategy(boolean produceChangelog, boolean deletionVector) {
this.produceChangelog = produceChangelog;
this.deletionVector = deletionVector;
this.needLookup = produceChangelog || deletionVector;
}

public static LookupStrategy from(boolean produceChangelog, boolean deletionVector) {
for (LookupStrategy strategy : values()) {
if (strategy.produceChangelog == produceChangelog
&& strategy.deletionVector == deletionVector) {
return strategy;
}
}
throw new IllegalArgumentException(
"Invalid combination of produceChangelog : "
+ produceChangelog
+ " and deletionVector : "
+ deletionVector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.HashIndexMaintainer;
Expand Down Expand Up @@ -129,7 +130,9 @@ public KeyValueFileStoreRead newRead() {
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder());
newReaderFactoryBuilder(),
options,
newIndexFileHandler());
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down Expand Up @@ -161,6 +164,11 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
}
DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
if (options.deletionVectorsEnabled()) {
deletionVectorsMaintainerFactory =
new DeletionVectorsMaintainer.Factory(newIndexFileHandler());
}
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
Expand All @@ -177,6 +185,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
keyValueFieldsExtractor,
tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.deletionvectors;

import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordWithPositionIterator;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link RecordReader} which apply {@link DeletionVector} to filter record. */
public class ApplyDeletionVectorReader<T> implements RecordReader<T> {

private final RecordReader<T> reader;

private final DeletionVector deletionVector;

public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletionVector) {
this.reader = reader;
this.deletionVector = deletionVector;
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
RecordIterator<T> batch = reader.readBatch();

if (batch == null) {
return null;
}

checkArgument(
batch instanceof RecordWithPositionIterator,
"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");

RecordWithPositionIterator<T> batchWithPosition = (RecordWithPositionIterator<T>) batch;

return batchWithPosition.filter(
a -> !deletionVector.isDeleted(batchWithPosition.returnedPosition()));
}

@Override
public void close() throws IOException {
reader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ private void checkVersion(InputStream in) throws IOException {
int version = in.read();
if (version != VERSION_ID_V1) {
throw new RuntimeException(
"Version not match, actual size: "
"Version not match, actual version: "
+ version
+ ", expert size: "
+ ", expert version: "
+ VERSION_ID_V1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
Expand Down Expand Up @@ -56,7 +57,7 @@ private DeletionVectorsMaintainer(
this.deletionVectors =
indexFile == null
? new HashMap<>()
: indexFileHandler.readAllDeletionVectors(indexFile);
: new HashMap<>(indexFileHandler.readAllDeletionVectors(indexFile));
this.modified = false;
}

Expand Down Expand Up @@ -115,12 +116,17 @@ public Optional<DeletionVector> deletionVectorOf(String fileName) {
return Optional.ofNullable(deletionVectors.get(fileName));
}

@VisibleForTesting
public Map<String, DeletionVector> deletionVectors() {
return deletionVectors;
}

/** Factory to restore {@link DeletionVectorsMaintainer}. */
public static class DeletionVectorsMaintainerFactory {
public static class Factory {

private final IndexFileHandler handler;

public DeletionVectorsMaintainerFactory(IndexFileHandler handler) {
public Factory(IndexFileHandler handler) {
this.handler = handler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.fs.FileIO;
Expand All @@ -42,6 +44,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
Expand All @@ -60,6 +64,9 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;

// FileName to its corresponding deletion vector
private final @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private KeyValueFileReaderFactory(
FileIO fileIO,
SchemaManager schemaManager,
Expand All @@ -69,7 +76,8 @@ private KeyValueFileReaderFactory(
BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder,
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition) {
BinaryRow partition,
@Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -80,6 +88,7 @@ private KeyValueFileReaderFactory(
this.asyncThreshold = asyncThreshold;
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.deletionVectorSupplier = deletionVectorSupplier;
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -113,17 +122,27 @@ private RecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
return new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
RecordReader<KeyValue> recordReader =
new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
if (deletionVectorSupplier != null) {
Optional<DeletionVector> optionalDeletionVector =
deletionVectorSupplier.apply(fileName);
if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) {
recordReader =
new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get());
}
}
return recordReader;
}

public static Builder builder(
Expand Down Expand Up @@ -166,6 +185,7 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private Builder(
FileIO fileIO,
Expand Down Expand Up @@ -218,6 +238,12 @@ public Builder withValueProjection(int[][] projection) {
return this;
}

public Builder withDeletionVectorSupplier(
Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.deletionVectorSupplier = deletionVectorSupplier;
return this;
}

public RowType keyType() {
return keyType;
}
Expand Down Expand Up @@ -248,7 +274,8 @@ public KeyValueFileReaderFactory build(
formatDiscover, extractor, keyProjection, valueProjection, filters),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition);
partition,
deletionVectorSupplier);
}

private void applyProjection() {
Expand Down
Loading

0 comments on commit dfb5278

Please sign in to comment.