From dfb5278de0f08ea3cf4669a4c676fc124a8527bc Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Sat, 9 Mar 2024 15:56:34 +0800 Subject: [PATCH] [core] Integrate deletion vector to reader and writer (#2958) --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 24 +++ .../apache/paimon/lookup/LookupStrategy.java | 56 ++++++ .../org/apache/paimon/KeyValueFileStore.java | 11 +- .../ApplyDeletionVectorReader.java | 65 ++++++ .../DeletionVectorsIndexFile.java | 4 +- .../DeletionVectorsMaintainer.java | 12 +- .../paimon/io/KeyValueFileReaderFactory.java | 53 +++-- .../apache/paimon/mergetree/LookupLevels.java | 139 ++++++++++++- .../compact/ChangelogMergeTreeRewriter.java | 37 +++- ...FullChangelogMergeTreeCompactRewriter.java | 5 +- .../LookupChangelogMergeFunctionWrapper.java | 41 +++- .../LookupMergeTreeCompactRewriter.java | 51 +++-- .../compact/MergeTreeCompactManager.java | 8 +- .../compact/MergeTreeCompactRewriter.java | 14 +- .../operation/AbstractFileStoreWrite.java | 42 +++- .../operation/AppendOnlyFileStoreWrite.java | 6 +- .../paimon/operation/FileStoreWrite.java | 7 +- .../operation/KeyValueFileStoreRead.java | 62 ++++-- .../operation/KeyValueFileStoreWrite.java | 168 ++++++++++------ .../operation/MemoryFileStoreWrite.java | 3 + .../paimon/schema/SchemaValidation.java | 27 +++ .../table/PrimaryKeyFileStoreTable.java | 3 +- .../paimon/table/query/LocalTableQuery.java | 2 +- .../DeletionVectorsMaintainerTest.java | 4 +- .../paimon/mergetree/MergeTreeTestBase.java | 6 +- ...okupChangelogMergeFunctionWrapperTest.java | 13 +- .../compact/MergeTreeCompactManagerTest.java | 3 +- .../apache/paimon/flink/sink/FlinkSink.java | 7 +- .../sink/MultiTablesStoreCompactOperator.java | 8 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 27 ++- .../source/TestChangelogDataReadWrite.java | 5 +- .../paimon/spark/sql/DeletionVectorTest.scala | 189 ++++++++++++++++++ 33 files changed, 937 insertions(+), 171 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index a1bf28c62c77..52645cf8cc78 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -164,6 +164,12 @@ Boolean Whether to ignore delete records in deduplicate mode. + +
deletion-vectors.enabled
+ false + Boolean + Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided. +
dynamic-bucket.assigner-parallelism
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index f8eec84fbab1..faea5399723c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -1075,6 +1076,15 @@ public class CoreOptions implements Serializable { .defaultValue(false) .withDescription("Whether to force create snapshot on commit."); + public static final ConfigOption DELETION_VECTORS_ENABLED = + key("deletion-vectors.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable deletion vectors mode. In this mode, index files containing deletion" + + " vectors are generated when data is written, which marks the data for deletion." + + " During read operations, by applying these index files, merging can be avoided."); + private final Options options; public CoreOptions(Map options) { @@ -1377,6 +1387,16 @@ public ChangelogProducer changelogProducer() { return options.get(CHANGELOG_PRODUCER); } + public boolean needLookup() { + return lookupStrategy().needLookup; + } + + public LookupStrategy lookupStrategy() { + return LookupStrategy.from( + options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP), + deletionVectorsEnabled()); + } + public boolean changelogRowDeduplicate() { return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE); } @@ -1634,6 +1654,10 @@ public int varTypeSize() { return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION); } + public boolean deletionVectorsEnabled() { + return options.get(DELETION_VECTORS_ENABLED); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java new file mode 100644 index 000000000000..6c709bcaef8e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +/** Strategy for lookup. */ +public enum LookupStrategy { + NO_LOOKUP(false, false), + + CHANGELOG_ONLY(true, false), + + DELETION_VECTOR_ONLY(false, true), + + CHANGELOG_AND_DELETION_VECTOR(true, true); + + public final boolean needLookup; + + public final boolean produceChangelog; + + public final boolean deletionVector; + + LookupStrategy(boolean produceChangelog, boolean deletionVector) { + this.produceChangelog = produceChangelog; + this.deletionVector = deletionVector; + this.needLookup = produceChangelog || deletionVector; + } + + public static LookupStrategy from(boolean produceChangelog, boolean deletionVector) { + for (LookupStrategy strategy : values()) { + if (strategy.produceChangelog == produceChangelog + && strategy.deletionVector == deletionVector) { + return strategy; + } + } + throw new IllegalArgumentException( + "Invalid combination of produceChangelog : " + + produceChangelog + + " and deletionVector : " + + deletionVector); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 727bf8f82b7a..21e6ac60e69c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -20,6 +20,7 @@ import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.HashIndexMaintainer; @@ -129,7 +130,9 @@ public KeyValueFileStoreRead newRead() { newKeyComparator(), userDefinedSeqComparator(), mfFactory, - newReaderFactoryBuilder()); + newReaderFactoryBuilder(), + options, + newIndexFileHandler()); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { @@ -161,6 +164,11 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma if (bucketMode() == BucketMode.DYNAMIC) { indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler()); } + DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null; + if (options.deletionVectorsEnabled()) { + deletionVectorsMaintainerFactory = + new DeletionVectorsMaintainer.Factory(newIndexFileHandler()); + } return new KeyValueFileStoreWrite( fileIO, schemaManager, @@ -177,6 +185,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma snapshotManager(), newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), indexFactory, + deletionVectorsMaintainerFactory, options, keyValueFieldsExtractor, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java new file mode 100644 index 000000000000..32d6da8617f5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.deletionvectors; + +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordWithPositionIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** A {@link RecordReader} which apply {@link DeletionVector} to filter record. */ +public class ApplyDeletionVectorReader implements RecordReader { + + private final RecordReader reader; + + private final DeletionVector deletionVector; + + public ApplyDeletionVectorReader(RecordReader reader, DeletionVector deletionVector) { + this.reader = reader; + this.deletionVector = deletionVector; + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + RecordIterator batch = reader.readBatch(); + + if (batch == null) { + return null; + } + + checkArgument( + batch instanceof RecordWithPositionIterator, + "There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator"); + + RecordWithPositionIterator batchWithPosition = (RecordWithPositionIterator) batch; + + return batchWithPosition.filter( + a -> !deletionVector.isDeleted(batchWithPosition.returnedPosition())); + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index a82cc9be8aae..313435928505 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -140,9 +140,9 @@ private void checkVersion(InputStream in) throws IOException { int version = in.read(); if (version != VERSION_ID_V1) { throw new RuntimeException( - "Version not match, actual size: " + "Version not match, actual version: " + version - + ", expert size: " + + ", expert version: " + VERSION_ID_V1); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index 7c88edc6c0b2..878c76841d6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -18,6 +18,7 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; @@ -56,7 +57,7 @@ private DeletionVectorsMaintainer( this.deletionVectors = indexFile == null ? new HashMap<>() - : indexFileHandler.readAllDeletionVectors(indexFile); + : new HashMap<>(indexFileHandler.readAllDeletionVectors(indexFile)); this.modified = false; } @@ -115,12 +116,17 @@ public Optional deletionVectorOf(String fileName) { return Optional.ofNullable(deletionVectors.get(fileName)); } + @VisibleForTesting + public Map deletionVectors() { + return deletionVectors; + } + /** Factory to restore {@link DeletionVectorsMaintainer}. */ - public static class DeletionVectorsMaintainerFactory { + public static class Factory { private final IndexFileHandler handler; - public DeletionVectorsMaintainerFactory(IndexFileHandler handler) { + public Factory(IndexFileHandler handler) { this.handler = handler; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 780149498454..3fa19681e4c3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -21,6 +21,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.fs.FileIO; @@ -42,6 +44,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; import java.util.function.Supplier; /** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */ @@ -60,6 +64,9 @@ public class KeyValueFileReaderFactory { private final Map bulkFormatMappings; private final BinaryRow partition; + // FileName to its corresponding deletion vector + private final @Nullable Function> deletionVectorSupplier; + private KeyValueFileReaderFactory( FileIO fileIO, SchemaManager schemaManager, @@ -69,7 +76,8 @@ private KeyValueFileReaderFactory( BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder, DataFilePathFactory pathFactory, long asyncThreshold, - BinaryRow partition) { + BinaryRow partition, + @Nullable Function> deletionVectorSupplier) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schemaId = schemaId; @@ -80,6 +88,7 @@ private KeyValueFileReaderFactory( this.asyncThreshold = asyncThreshold; this.partition = partition; this.bulkFormatMappings = new HashMap<>(); + this.deletionVectorSupplier = deletionVectorSupplier; } public RecordReader createRecordReader( @@ -113,17 +122,27 @@ private RecordReader createRecordReader( new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - return new KeyValueDataFileRecordReader( - fileIO, - bulkFormatMapping.getReaderFactory(), - pathFactory.toPath(fileName), - keyType, - valueType, - level, - poolSize, - bulkFormatMapping.getIndexMapping(), - bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + RecordReader recordReader = + new KeyValueDataFileRecordReader( + fileIO, + bulkFormatMapping.getReaderFactory(), + pathFactory.toPath(fileName), + keyType, + valueType, + level, + poolSize, + bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getCastMapping(), + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + if (deletionVectorSupplier != null) { + Optional optionalDeletionVector = + deletionVectorSupplier.apply(fileName); + if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) { + recordReader = + new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get()); + } + } + return recordReader; } public static Builder builder( @@ -166,6 +185,7 @@ public static class Builder { private int[][] valueProjection; private RowType projectedKeyType; private RowType projectedValueType; + private @Nullable Function> deletionVectorSupplier; private Builder( FileIO fileIO, @@ -218,6 +238,12 @@ public Builder withValueProjection(int[][] projection) { return this; } + public Builder withDeletionVectorSupplier( + Function> deletionVectorSupplier) { + this.deletionVectorSupplier = deletionVectorSupplier; + return this; + } + public RowType keyType() { return keyType; } @@ -248,7 +274,8 @@ public KeyValueFileReaderFactory build( formatDiscover, extractor, keyProjection, valueProjection, filters), pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), - partition); + partition, + deletionVectorSupplier); } private void applyProjection() { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index 4869055ad5aa..3e7e127027cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -29,6 +29,7 @@ import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordWithPositionIterator; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BloomFilter; @@ -142,7 +143,8 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException { return null; } - return valueProcessor.readFromDisk(key, lookupFile.remoteFile().level(), valueBytes); + return valueProcessor.readFromDisk( + key, lookupFile.remoteFile().level(), valueBytes, file.fileName()); } private int fileWeigh(String file, LookupFile lookupFile) { @@ -168,15 +170,29 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount())); LookupStoreFactory.Context context; try (RecordReader reader = fileReaderFactory.apply(file)) { - RecordReader.RecordIterator batch; KeyValue kv; - while ((batch = reader.readBatch()) != null) { - while ((kv = batch.next()) != null) { - byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); - byte[] valueBytes = valueProcessor.persistToDisk(kv); - kvWriter.put(keyBytes, valueBytes); + if (valueProcessor.withPosition()) { + RecordWithPositionIterator batch; + while ((batch = (RecordWithPositionIterator) reader.readBatch()) + != null) { + while ((kv = batch.next()) != null) { + byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); + byte[] valueBytes = + valueProcessor.persistToDisk(kv, batch.returnedPosition()); + kvWriter.put(keyBytes, valueBytes); + } + batch.releaseBatch(); + } + } else { + RecordReader.RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + while ((kv = batch.next()) != null) { + byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); + byte[] valueBytes = valueProcessor.persistToDisk(kv); + kvWriter.put(keyBytes, valueBytes); + } + batch.releaseBatch(); } - batch.releaseBatch(); } } catch (IOException e) { FileIOUtils.deleteFileOrDirectory(localFile); @@ -228,9 +244,15 @@ public void close() throws IOException { /** Processor to process value. */ public interface ValueProcessor { + boolean withPosition(); + byte[] persistToDisk(KeyValue kv); - T readFromDisk(InternalRow key, int level, byte[] valueBytes); + default byte[] persistToDisk(KeyValue kv, long rowPosition) { + throw new UnsupportedOperationException(); + } + + T readFromDisk(InternalRow key, int level, byte[] valueBytes, String fileName); } /** A {@link ValueProcessor} to return {@link KeyValue}. */ @@ -242,6 +264,11 @@ public KeyValueProcessor(RowType valueType) { this.valueSerializer = new RowCompactedSerializer(valueType); } + @Override + public boolean withPosition() { + return false; + } + @Override public byte[] persistToDisk(KeyValue kv) { byte[] vBytes = valueSerializer.serializeToBytes(kv.value()); @@ -254,7 +281,7 @@ public byte[] persistToDisk(KeyValue kv) { } @Override - public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes) { + public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) { InternalRow value = valueSerializer.deserialize(bytes); long sequenceNumber = MemorySegment.wrap(bytes).getLong(bytes.length - 9); RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]); @@ -267,14 +294,104 @@ public static class ContainsValueProcessor implements ValueProcessor { private static final byte[] EMPTY_BYTES = new byte[0]; + @Override + public boolean withPosition() { + return false; + } + @Override public byte[] persistToDisk(KeyValue kv) { return EMPTY_BYTES; } @Override - public Boolean readFromDisk(InternalRow key, int level, byte[] bytes) { + public Boolean readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) { return Boolean.TRUE; } } + + /** A {@link ValueProcessor} to return {@link PositionedKeyValue}. */ + public static class PositionedKeyValueProcessor implements ValueProcessor { + private final boolean persistValue; + private final RowCompactedSerializer valueSerializer; + + public PositionedKeyValueProcessor(RowType valueType, boolean persistValue) { + this.persistValue = persistValue; + this.valueSerializer = persistValue ? new RowCompactedSerializer(valueType) : null; + } + + @Override + public boolean withPosition() { + return true; + } + + @Override + public byte[] persistToDisk(KeyValue kv) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] persistToDisk(KeyValue kv, long rowPosition) { + if (persistValue) { + byte[] vBytes = valueSerializer.serializeToBytes(kv.value()); + byte[] bytes = new byte[vBytes.length + 8 + 8 + 1]; + MemorySegment segment = MemorySegment.wrap(bytes); + segment.put(0, vBytes); + segment.putLong(bytes.length - 17, rowPosition); + segment.putLong(bytes.length - 9, kv.sequenceNumber()); + segment.put(bytes.length - 1, kv.valueKind().toByteValue()); + return bytes; + } else { + byte[] bytes = new byte[8]; + MemorySegment segment = MemorySegment.wrap(bytes); + segment.putLong(0, rowPosition); + return bytes; + } + } + + @Override + public PositionedKeyValue readFromDisk( + InternalRow key, int level, byte[] bytes, String fileName) { + if (persistValue) { + InternalRow value = valueSerializer.deserialize(bytes); + MemorySegment segment = MemorySegment.wrap(bytes); + long rowPosition = segment.getLong(bytes.length - 17); + long sequenceNumber = segment.getLong(bytes.length - 9); + RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]); + return new PositionedKeyValue( + new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(level), + fileName, + rowPosition); + } else { + MemorySegment segment = MemorySegment.wrap(bytes); + return new PositionedKeyValue(null, fileName, segment.getLong(0)); + } + } + } + + /** {@link KeyValue} with file name and row position for DeletionVector. */ + public static class PositionedKeyValue { + private final @Nullable KeyValue keyValue; + private final String fileName; + private final long rowPosition; + + public PositionedKeyValue(@Nullable KeyValue keyValue, String fileName, long rowPosition) { + this.keyValue = keyValue; + this.fileName = fileName; + this.rowPosition = rowPosition; + } + + public String fileName() { + return fileName; + } + + public long rowPosition() { + return rowPosition; + } + + @Nullable + public KeyValue keyValue() { + return keyValue; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index f338ee056268..c807fca23464 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -22,10 +22,12 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; @@ -47,6 +49,7 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite protected final int maxLevel; protected final MergeEngine mergeEngine; + protected final LookupStrategy lookupStrategy; public ChangelogMergeTreeRewriter( int maxLevel, @@ -56,16 +59,20 @@ public ChangelogMergeTreeRewriter( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, - MergeSorter mergeSorter) { + MergeSorter mergeSorter, + LookupStrategy lookupStrategy, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { super( readerFactory, writerFactory, keyComparator, userDefinedSeqComparator, mfFactory, - mergeSorter); + mergeSorter, + deletionVectorsMaintainer); this.maxLevel = maxLevel; this.mergeEngine = mergeEngine; + this.lookupStrategy = lookupStrategy; } protected abstract boolean rewriteChangelog( @@ -136,7 +143,9 @@ private CompactResult rewriteChangelogCompaction( if (rewriteCompactFile) { compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel); } - changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); + if (lookupStrategy.produceChangelog) { + changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); + } while (iterator.hasNext()) { ChangelogResult result = iterator.next(); @@ -144,8 +153,10 @@ private CompactResult rewriteChangelogCompaction( if (rewriteCompactFile && keyValue != null && (!dropDelete || keyValue.isAdd())) { compactFileWriter.write(keyValue); } - for (KeyValue kv : result.changelogs()) { - changelogFileWriter.write(kv); + if (lookupStrategy.produceChangelog) { + for (KeyValue kv : result.changelogs()) { + changelogFileWriter.write(kv); + } } } } finally { @@ -168,7 +179,18 @@ private CompactResult rewriteChangelogCompaction( .map(x -> x.upgrade(outputLevel)) .collect(Collectors.toList()); - return new CompactResult(before, after, changelogFileWriter.result()); + if (deletionVectorsMaintainer != null) { + for (DataFileMeta dataFileMeta : before) { + deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName()); + } + } + + return new CompactResult( + before, + after, + lookupStrategy.produceChangelog + ? changelogFileWriter.result() + : Collections.emptyList()); } @Override @@ -179,7 +201,8 @@ public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exceptio outputLevel, Collections.singletonList( Collections.singletonList(SortedRun.fromSingle(file))), - false, + // In deletion vector mode, we always drop deletion + lookupStrategy.deletionVector, strategy.rewrite); } else { return super.upgrade(outputLevel, file); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java index 3ed7a5d69822..1eaa76cf4ac4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java @@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.utils.FieldsComparator; @@ -64,7 +65,9 @@ public FullChangelogMergeTreeCompactRewriter( keyComparator, userDefinedSeqComparator, mfFactory, - mergeSorter); + mergeSorter, + LookupStrategy.CHANGELOG_ONLY, + null); this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index 319f5055a431..aa5555fd57d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -21,8 +21,13 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.lookup.LookupStrategy; +import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue; import org.apache.paimon.types.RowKind; +import javax.annotation.Nullable; + import java.util.Iterator; import java.util.LinkedList; import java.util.function.Function; @@ -44,33 +49,45 @@ * level as BEFORE. * */ -public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper { +public class LookupChangelogMergeFunctionWrapper + implements MergeFunctionWrapper { private final LookupMergeFunction mergeFunction; private final MergeFunction mergeFunction2; - private final Function lookup; + private final Function lookup; private final ChangelogResult reusedResult = new ChangelogResult(); private final KeyValue reusedBefore = new KeyValue(); private final KeyValue reusedAfter = new KeyValue(); private final RecordEqualiser valueEqualiser; private final boolean changelogRowDeduplicate; + private final LookupStrategy lookupStrategy; + private final @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer; public LookupChangelogMergeFunctionWrapper( MergeFunctionFactory mergeFunctionFactory, - Function lookup, + Function lookup, RecordEqualiser valueEqualiser, - boolean changelogRowDeduplicate) { + boolean changelogRowDeduplicate, + LookupStrategy lookupStrategy, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { MergeFunction mergeFunction = mergeFunctionFactory.create(); checkArgument( mergeFunction instanceof LookupMergeFunction, "Merge function should be a LookupMergeFunction, but is %s, there is a bug.", mergeFunction.getClass().getName()); + if (lookupStrategy.deletionVector) { + checkArgument( + deletionVectorsMaintainer != null, + "deletionVectorsMaintainer should not be null, there is a bug."); + } this.mergeFunction = (LookupMergeFunction) mergeFunction; this.mergeFunction2 = mergeFunctionFactory.create(); this.lookup = lookup; this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; + this.lookupStrategy = lookupStrategy; + this.deletionVectorsMaintainer = deletionVectorsMaintainer; } @Override @@ -105,7 +122,17 @@ public ChangelogResult getResult() { // 2. Lookup if latest high level record is absent if (highLevel == null) { InternalRow lookupKey = candidates.get(0).key(); - highLevel = lookup.apply(lookupKey); + T lookupResult = lookup.apply(lookupKey); + if (lookupResult != null) { + if (lookupStrategy.deletionVector) { + PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult; + highLevel = positionedKeyValue.keyValue(); + deletionVectorsMaintainer.notifyNewDeletion( + positionedKeyValue.fileName(), positionedKeyValue.rowPosition()); + } else { + highLevel = (KeyValue) lookupResult; + } + } } // 3. Calculate result @@ -118,14 +145,14 @@ public ChangelogResult getResult() { // 4. Set changelog when there's level-0 records reusedResult.reset(); - if (containLevel0) { + if (containLevel0 && lookupStrategy.produceChangelog) { setChangelog(highLevel, result); } return reusedResult.setResult(result); } - private void setChangelog(KeyValue before, KeyValue after) { + private void setChangelog(@Nullable KeyValue before, KeyValue after) { if (before == null || !before.isAdd()) { if (after.isAdd()) { reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after)); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index 5335ba82ab66..95b7ab78c055 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -22,9 +22,11 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.LookupLevels; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; @@ -40,6 +42,7 @@ import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE; import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE; import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction @@ -60,7 +63,9 @@ public LookupMergeTreeCompactRewriter( @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter, - MergeFunctionWrapperFactory wrapperFactory) { + MergeFunctionWrapperFactory wrapperFactory, + LookupStrategy lookupStrategy, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { super( maxLevel, mergeEngine, @@ -69,7 +74,14 @@ public LookupMergeTreeCompactRewriter( keyComparator, userDefinedSeqComparator, mfFactory, - mergeSorter); + mergeSorter, + lookupStrategy, + deletionVectorsMaintainer); + if (lookupStrategy.deletionVector) { + checkArgument( + deletionVectorsMaintainer != null, + "deletionVectorsMaintainer should not be null, there is a bug."); + } this.lookupLevels = lookupLevels; this.wrapperFactory = wrapperFactory; } @@ -86,12 +98,17 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { return NO_CHANGELOG; } + // In deletionVector mode, since drop delete is required, rewrite is always required. + if (lookupStrategy.deletionVector) { + return CHANGELOG_WITH_REWRITE; + } + if (outputLevel == maxLevel) { return CHANGELOG_NO_REWRITE; } // DEDUPLICATE retains the latest records as the final result, so merging has no impact on - // it at all + // it at all. if (mergeEngine == MergeEngine.DEDUPLICATE) { return CHANGELOG_NO_REWRITE; } @@ -104,7 +121,8 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { @Override protected MergeFunctionWrapper createMergeWrapper(int outputLevel) { - return wrapperFactory.create(mfFactory, outputLevel, lookupLevels); + return wrapperFactory.create( + mfFactory, outputLevel, lookupLevels, deletionVectorsMaintainer); } @Override @@ -118,28 +136,34 @@ public interface MergeFunctionWrapperFactory { MergeFunctionWrapper create( MergeFunctionFactory mfFactory, int outputLevel, - LookupLevels lookupLevels); + LookupLevels lookupLevels, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer); } /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper. */ - public static class LookupMergeFunctionWrapperFactory - implements MergeFunctionWrapperFactory { + public static class LookupMergeFunctionWrapperFactory + implements MergeFunctionWrapperFactory { private final RecordEqualiser valueEqualiser; private final boolean changelogRowDeduplicate; + private final LookupStrategy lookupStrategy; public LookupMergeFunctionWrapperFactory( - RecordEqualiser valueEqualiser, boolean changelogRowDeduplicate) { + RecordEqualiser valueEqualiser, + boolean changelogRowDeduplicate, + LookupStrategy lookupStrategy) { this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; + this.lookupStrategy = lookupStrategy; } @Override public MergeFunctionWrapper create( MergeFunctionFactory mfFactory, int outputLevel, - LookupLevels lookupLevels) { - return new LookupChangelogMergeFunctionWrapper( + LookupLevels lookupLevels, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + return new LookupChangelogMergeFunctionWrapper<>( mfFactory, key -> { try { @@ -149,7 +173,9 @@ public MergeFunctionWrapper create( } }, valueEqualiser, - changelogRowDeduplicate); + changelogRowDeduplicate, + lookupStrategy, + deletionVectorsMaintainer); } } @@ -161,7 +187,8 @@ public static class FirstRowMergeFunctionWrapperFactory public MergeFunctionWrapper create( MergeFunctionFactory mfFactory, int outputLevel, - LookupLevels lookupLevels) { + LookupLevels lookupLevels, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { return new FistRowMergeFunctionWrapper( mfFactory, key -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 6d94ad574565..7bdb44118f2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -58,6 +58,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { private final CompactRewriter rewriter; @Nullable private final CompactionMetrics.Reporter metricsReporter; + private final boolean deletionVectorsEnabled; public MergeTreeCompactManager( ExecutorService executor, @@ -67,7 +68,8 @@ public MergeTreeCompactManager( long compactionFileSize, int numSortedRunStopTrigger, CompactRewriter rewriter, - @Nullable CompactionMetrics.Reporter metricsReporter) { + @Nullable CompactionMetrics.Reporter metricsReporter, + boolean deletionVectorsEnabled) { this.executor = executor; this.levels = levels; this.strategy = strategy; @@ -76,6 +78,7 @@ public MergeTreeCompactManager( this.keyComparator = keyComparator; this.rewriter = rewriter; this.metricsReporter = metricsReporter; + this.deletionVectorsEnabled = deletionVectorsEnabled; MetricUtils.safeCall(this::reportLevel0FileCount, LOG); } @@ -145,7 +148,8 @@ public void triggerCompaction(boolean fullCompaction) { */ boolean dropDelete = unit.outputLevel() != 0 - && unit.outputLevel() >= levels.nonEmptyHighestLevel(); + && (unit.outputLevel() >= levels.nonEmptyHighestLevel() + || deletionVectorsEnabled); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index bd3338bf74b6..8e8f67207317 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -46,6 +47,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter { @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final MergeFunctionFactory mfFactory; protected final MergeSorter mergeSorter; + @Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; public MergeTreeCompactRewriter( KeyValueFileReaderFactory readerFactory, @@ -53,13 +55,15 @@ public MergeTreeCompactRewriter( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, - MergeSorter mergeSorter) { + MergeSorter mergeSorter, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.keyComparator = keyComparator; this.userDefinedSeqComparator = userDefinedSeqComparator; this.mfFactory = mfFactory; this.mergeSorter = mergeSorter; + this.deletionVectorsMaintainer = deletionVectorsMaintainer; } @Override @@ -83,6 +87,12 @@ protected CompactResult rewriteCompaction( mergeSorter); writer.write(new RecordReaderIterator<>(sectionsReader)); writer.close(); - return new CompactResult(extractFilesFromSections(sections), writer.result()); + List before = extractFilesFromSections(sections); + if (deletionVectorsMaintainer != null) { + for (DataFileMeta dataFileMeta : before) { + deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName()); + } + } + return new CompactResult(before, writer.result()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index af78e34632e8..30ee9c6c1d07 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexMaintainer; @@ -65,6 +66,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private final FileStoreScan scan; private final int writerNumberMax; @Nullable private final IndexMaintainer.Factory indexFactory; + @Nullable private final DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory; @Nullable protected IOManager ioManager; @@ -83,13 +85,14 @@ protected AbstractFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, String tableName, int writerNumberMax) { this.commitUser = commitUser; this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; - + this.deletionVectorsMaintainerFactory = deletionVectorsMaintainerFactory; this.writers = new HashMap<>(); this.tableName = tableName; this.writerNumberMax = writerNumberMax; @@ -193,7 +196,10 @@ public List prepareCommit(boolean waitCompaction, long commitIden CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); List newIndexFiles = new ArrayList<>(); if (writerContainer.indexMaintainer != null) { - newIndexFiles = writerContainer.indexMaintainer.prepareCommit(); + newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit()); + } + if (writerContainer.deletionVectorsMaintainer != null) { + newIndexFiles.addAll(writerContainer.deletionVectorsMaintainer.prepareCommit()); } CommitMessageImpl committable = new CommitMessageImpl( @@ -292,6 +298,7 @@ public List> checkpoint() { writerContainer.lastModifiedCommitIdentifier, dataFiles, writerContainer.indexMaintainer, + writerContainer.deletionVectorsMaintainer, increment)); } } @@ -311,10 +318,15 @@ public void restore(List> states) { state.bucket, state.dataFiles, state.commitIncrement, - compactExecutor()); + compactExecutor(), + state.deletionVectorsMaintainer); notifyNewWriter(writer); WriterContainer writerContainer = - new WriterContainer<>(writer, state.indexMaintainer, state.baseSnapshotId); + new WriterContainer<>( + writer, + state.indexMaintainer, + state.deletionVectorsMaintainer, + state.baseSnapshotId); writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier; writers.computeIfAbsent(state.partition, k -> new HashMap<>()) .put(state.bucket, writerContainer); @@ -360,10 +372,22 @@ public WriterContainer createWriterContainer( ? null : indexFactory.createOrRestore( ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); + DeletionVectorsMaintainer deletionVectorsMaintainer = + deletionVectorsMaintainerFactory == null + ? null + : deletionVectorsMaintainerFactory.createOrRestore( + ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); RecordWriter writer = - createWriter(partition.copy(), bucket, restoreFiles, null, compactExecutor()); + createWriter( + partition.copy(), + bucket, + restoreFiles, + null, + compactExecutor(), + deletionVectorsMaintainer); notifyNewWriter(writer); - return new WriterContainer<>(writer, indexMaintainer, latestSnapshotId); + return new WriterContainer<>( + writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId); } @Override @@ -409,7 +433,8 @@ protected abstract RecordWriter createWriter( int bucket, List restoreFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor); + ExecutorService compactExecutor, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer); // force buffer spill to avoid out of memory in batch mode protected void forceBufferSpill() throws Exception {} @@ -422,15 +447,18 @@ protected void forceBufferSpill() throws Exception {} public static class WriterContainer { public final RecordWriter writer; @Nullable public final IndexMaintainer indexMaintainer; + @Nullable public final DeletionVectorsMaintainer deletionVectorsMaintainer; protected final long baseSnapshotId; protected long lastModifiedCommitIdentifier; protected WriterContainer( RecordWriter writer, @Nullable IndexMaintainer indexMaintainer, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, Long baseSnapshotId) { this.writer = writer; this.indexMaintainer = indexMaintainer; + this.deletionVectorsMaintainer = deletionVectorsMaintainer; this.baseSnapshotId = baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : baseSnapshotId; this.lastModifiedCommitIdentifier = Long.MIN_VALUE; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 283df7b07583..fbb51960a06d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -26,6 +26,7 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; @@ -85,7 +86,7 @@ public AppendOnlyFileStoreWrite( FileStoreScan scan, CoreOptions options, String tableName) { - super(commitUser, snapshotManager, scan, options, null, tableName); + super(commitUser, snapshotManager, scan, options, null, null, tableName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; @@ -110,7 +111,8 @@ protected RecordWriter createWriter( int bucket, List restoredFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor) { + ExecutorService compactExecutor, + @Nullable DeletionVectorsMaintainer ignore) { // let writer and compact manager hold the same reference // and make restore files mutable to update long maxSequenceNumber = getMaxSequenceNumber(restoredFiles); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index e6c5ed50fe71..1391b69163c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -20,6 +20,7 @@ import org.apache.paimon.FileStore; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.IndexMaintainer; import org.apache.paimon.io.DataFileMeta; @@ -146,6 +147,7 @@ class State { protected final long lastModifiedCommitIdentifier; protected final List dataFiles; @Nullable protected final IndexMaintainer indexMaintainer; + @Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; protected final CommitIncrement commitIncrement; protected State( @@ -155,6 +157,7 @@ protected State( long lastModifiedCommitIdentifier, Collection dataFiles, @Nullable IndexMaintainer indexMaintainer, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, CommitIncrement commitIncrement) { this.partition = partition; this.bucket = bucket; @@ -162,19 +165,21 @@ protected State( this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier; this.dataFiles = new ArrayList<>(dataFiles); this.indexMaintainer = indexMaintainer; + this.deletionVectorsMaintainer = deletionVectorsMaintainer; this.commitIncrement = commitIncrement; } @Override public String toString() { return String.format( - "{%s, %d, %d, %d, %s, %s, %s}", + "{%s, %d, %d, %d, %s, %s, %s, %s}", partition, bucket, baseSnapshotId, lastModifiedCommitIdentifier, dataFiles, indexMaintainer, + deletionVectorsMaintainer, commitIncrement); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index 8852e38a3d1e..e7fbbea45cec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -23,7 +23,9 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.DropDeleteReader; @@ -79,6 +81,8 @@ public class KeyValueFileStoreRead implements FileStoreRead { @Nullable private int[][] pushdownProjection; @Nullable private int[][] outerProjection; + private final CoreOptions options; + private final IndexFileHandler indexFileHandler; private boolean forceKeepDelete = false; @@ -90,7 +94,9 @@ public KeyValueFileStoreRead( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, - KeyValueFileReaderFactory.Builder readerFactoryBuilder) { + KeyValueFileReaderFactory.Builder readerFactoryBuilder, + CoreOptions options, + IndexFileHandler indexFileHandler) { this.tableSchema = schemaManager.schema(schemaId); this.readerFactoryBuilder = readerFactoryBuilder; this.keyComparator = keyComparator; @@ -99,6 +105,8 @@ public KeyValueFileStoreRead( this.mergeSorter = new MergeSorter( CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null); + this.options = options; + this.indexFileHandler = indexFileHandler; } public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) { @@ -175,6 +183,20 @@ public RecordReader createReader(DataSplit split) throws IOException { private RecordReader createReaderWithoutOuterProjection(DataSplit split) throws IOException { + if (options.deletionVectorsEnabled()) { + indexFileHandler + .scan( + split.snapshotId(), + DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, + split.partition(), + split.bucket()) + .ifPresent( + fileMeta -> + readerFactoryBuilder.withDeletionVectorSupplier( + filename -> + indexFileHandler.readDeletionVector( + fileMeta, filename))); + } if (split.isStreaming()) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build( @@ -213,24 +235,30 @@ private RecordReader batchMergeRead( readerFactoryBuilder.build( partition, bucket, false, filtersForNonOverlappedSection); - List> sectionReaders = new ArrayList<>(); - MergeFunctionWrapper mergeFuncWrapper = - new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection)); - for (List section : new IntervalPartition(files, keyComparator).partition()) { - sectionReaders.add( - () -> - MergeTreeReaders.readerForSection( - section, - section.size() > 1 - ? overlappedSectionFactory - : nonOverlappedSectionFactory, - keyComparator, - userDefinedSeqComparator, - mergeFuncWrapper, - mergeSorter)); + RecordReader reader; + if (options.deletionVectorsEnabled()) { + reader = streamingConcat(files, nonOverlappedSectionFactory); + } else { + List> sectionReaders = new ArrayList<>(); + MergeFunctionWrapper mergeFuncWrapper = + new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection)); + for (List section : + new IntervalPartition(files, keyComparator).partition()) { + sectionReaders.add( + () -> + MergeTreeReaders.readerForSection( + section, + section.size() > 1 + ? overlappedSectionFactory + : nonOverlappedSectionFactory, + keyComparator, + userDefinedSeqComparator, + mergeFuncWrapper, + mergeSorter)); + } + reader = ConcatRecordReader.create(sectionReaders); } - RecordReader reader = ConcatRecordReader.create(sectionReaders); if (!keepDelete) { reader = new DropDeleteReader(reader); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 8fb99645bdc8..b37069fe485b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.annotation.VisibleForTesting; @@ -28,17 +29,20 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.IndexMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.lookup.hash.HashLookupStoreFactory; import org.apache.paimon.mergetree.Levels; import org.apache.paimon.mergetree.LookupLevels; import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor; import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor; +import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValueProcessor; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeWriter; import org.apache.paimon.mergetree.compact.CompactRewriter; @@ -73,6 +77,8 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; @@ -108,10 +114,18 @@ public KeyValueFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) { - super(commitUser, snapshotManager, scan, options, indexFactory, tableName); + super( + commitUser, + snapshotManager, + scan, + options, + indexFactory, + deletionVectorsMaintainerFactory, + tableName); this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; @@ -148,7 +162,8 @@ protected MergeTreeWriter createWriter( int bucket, List restoreFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor) { + ExecutorService compactExecutor, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { if (LOG.isDebugEnabled()) { LOG.debug( "Creating merge tree writer for partition {} bucket {} from restored files {}", @@ -168,11 +183,17 @@ protected MergeTreeWriter createWriter( options.numSortedRunCompactionTrigger(), options.optimizedCompactionInterval()); CompactStrategy compactStrategy = - options.changelogProducer() == ChangelogProducer.LOOKUP + options.needLookup() ? new ForceUpLevel0Compaction(universalCompaction) : universalCompaction; CompactManager compactManager = - createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels); + createCompactManager( + partition, + bucket, + compactStrategy, + compactExecutor, + levels, + deletionVectorsMaintainer); return new MergeTreeWriter( bufferSpillable(), @@ -200,7 +221,8 @@ private CompactManager createCompactManager( int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, - Levels levels) { + Levels levels, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); } else { @@ -208,7 +230,12 @@ private CompactManager createCompactManager( @Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get(); CompactRewriter rewriter = createRewriter( - partition, bucket, keyComparator, userDefinedSeqComparator, levels); + partition, + bucket, + keyComparator, + userDefinedSeqComparator, + levels, + deletionVectorsMaintainer); return new MergeTreeCompactManager( compactExecutor, levels, @@ -219,7 +246,8 @@ private CompactManager createCompactManager( rewriter, compactionMetrics == null ? null - : compactionMetrics.createReporter(partition, bucket)); + : compactionMetrics.createReporter(partition, bucket), + options.deletionVectorsEnabled()); } } @@ -228,68 +256,82 @@ private MergeTreeCompactRewriter createRewriter( int bucket, Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, - Levels levels) { + Levels levels, + @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + if (deletionVectorsMaintainer != null) { + readerFactoryBuilder.withDeletionVectorSupplier( + deletionVectorsMaintainer::deletionVectorOf); + } KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); int maxLevel = options.numLevels() - 1; - CoreOptions.MergeEngine mergeEngine = options.mergeEngine(); - switch (options.changelogProducer()) { - case FULL_COMPACTION: - return new FullChangelogMergeTreeCompactRewriter( - maxLevel, - mergeEngine, - readerFactory, - writerFactory, - keyComparator, - userDefinedSeqComparator, - mfFactory, - mergeSorter, - valueEqualiserSupplier.get(), - options.changelogRowDeduplicate()); - case LOOKUP: - if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) { - KeyValueFileReaderFactory keyOnlyReader = - readerFactoryBuilder - .copyWithoutProjection() - .withValueProjection(new int[0][]) - .build(partition, bucket); - return new LookupMergeTreeCompactRewriter<>( - maxLevel, - mergeEngine, - createLookupLevels(levels, new ContainsValueProcessor(), keyOnlyReader), - readerFactory, - writerFactory, - keyComparator, - userDefinedSeqComparator, - mfFactory, - mergeSorter, - new FirstRowMergeFunctionWrapperFactory()); - } else { - return new LookupMergeTreeCompactRewriter<>( - maxLevel, - mergeEngine, - createLookupLevels( - levels, new KeyValueProcessor(valueType), readerFactory), - readerFactory, - writerFactory, - keyComparator, - userDefinedSeqComparator, - mfFactory, - mergeSorter, - new LookupMergeFunctionWrapperFactory( - valueEqualiserSupplier.get(), - options.changelogRowDeduplicate())); + MergeEngine mergeEngine = options.mergeEngine(); + ChangelogProducer changelogProducer = options.changelogProducer(); + if (changelogProducer.equals(FULL_COMPACTION)) { + return new FullChangelogMergeTreeCompactRewriter( + maxLevel, + mergeEngine, + readerFactory, + writerFactory, + keyComparator, + userDefinedSeqComparator, + mfFactory, + mergeSorter, + valueEqualiserSupplier.get(), + options.changelogRowDeduplicate()); + } else if (options.needLookup()) { + LookupStrategy lookupStrategy = options.lookupStrategy(); + LookupLevels.ValueProcessor processor; + LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory wrapperFactory; + KeyValueFileReaderFactory lookupReaderFactory = readerFactory; + if (mergeEngine == FIRST_ROW) { + if (options.deletionVectorsEnabled()) { + throw new UnsupportedOperationException( + "Deletion vectors mode is not supported for first row merge engine now."); } - default: - return new MergeTreeCompactRewriter( - readerFactory, - writerFactory, - keyComparator, - userDefinedSeqComparator, - mfFactory, - mergeSorter); + lookupReaderFactory = + readerFactoryBuilder + .copyWithoutProjection() + .withValueProjection(new int[0][]) + .build(partition, bucket); + processor = new ContainsValueProcessor(); + wrapperFactory = new FirstRowMergeFunctionWrapperFactory(); + } else { + processor = + lookupStrategy.deletionVector + ? new PositionedKeyValueProcessor( + valueType, lookupStrategy.produceChangelog) + : new KeyValueProcessor(valueType); + wrapperFactory = + new LookupMergeFunctionWrapperFactory<>( + valueEqualiserSupplier.get(), + options.changelogRowDeduplicate(), + lookupStrategy); + } + return new LookupMergeTreeCompactRewriter( + maxLevel, + mergeEngine, + createLookupLevels(levels, processor, lookupReaderFactory), + readerFactory, + writerFactory, + keyComparator, + userDefinedSeqComparator, + mfFactory, + mergeSorter, + wrapperFactory, + lookupStrategy, + deletionVectorsMaintainer); + } else { + return new MergeTreeCompactRewriter( + readerFactory, + writerFactory, + keyComparator, + userDefinedSeqComparator, + mfFactory, + mergeSorter, + deletionVectorsMaintainer); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 9159965d9f9c..0b2bd719ff96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.index.IndexMaintainer; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.HeapMemorySegmentPool; @@ -60,12 +61,14 @@ public MemoryFileStoreWrite( FileStoreScan scan, CoreOptions options, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, String tableName) { super( commitUser, snapshotManager, scan, indexFactory, + deletionVectorsMaintainerFactory, tableName, options.writeMaxWritersToSpill()); this.options = options; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 91ccf9c11b1c..68f009c415c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryString; @@ -49,6 +50,8 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; +import static org.apache.paimon.CoreOptions.FileFormatType.ORC; +import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; @@ -219,6 +222,10 @@ public static void validateTableSchema(TableSchema schema) { schema.primaryKeys(), schema.partitionKeys())); } } + + if (options.deletionVectorsEnabled()) { + validateForDeletionVectors(schema, options); + } } private static void validateOnlyContainPrimitiveType( @@ -471,4 +478,24 @@ private static void validateDefaultValues(TableSchema schema) { } } } + + private static void validateForDeletionVectors(TableSchema schema, CoreOptions options) { + checkArgument( + !schema.primaryKeys().isEmpty(), + "Deletion vectors mode is only supported for tables with primary keys."); + + checkArgument( + options.formatType().equals(ORC) || options.formatType().equals(PARQUET), + "Deletion vectors mode is only supported for orc or parquet file format now."); + + checkArgument( + options.changelogProducer() == ChangelogProducer.NONE + || options.changelogProducer() == ChangelogProducer.LOOKUP, + "Deletion vectors mode is only supported for none or lookup changelog producer now."); + + // todo: implement it + checkArgument( + !options.mergeEngine().equals(MergeEngine.FIRST_ROW), + "Deletion vectors mode is not supported for first row merge engine now."); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 11e557f17649..781e71e86865 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -19,7 +19,6 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.InternalRow; @@ -89,7 +88,7 @@ public KeyValueFileStore store() { MergeFunctionFactory mfFactory = PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor); - if (options.changelogProducer() == ChangelogProducer.LOOKUP) { + if (options.needLookup()) { mfFactory = LookupMergeFunction.wrap( mfFactory, new RowType(extractor.keyFields(tableSchema)), rowType); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 3ed9a3ac726a..e10184fefc43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -86,7 +86,7 @@ public LocalTableQuery(FileStoreTable table) { options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR), options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)); - if (options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP) { + if (options.needLookup()) { startLevel = 1; } else { if (options.sequenceField().isPresent()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java index 6749c6a93741..9bfb4f81a86a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java @@ -42,8 +42,8 @@ public void beforeEach() throws Exception { @Test public void test0() { - DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory factory = - new DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory(fileHandler); + DeletionVectorsMaintainer.Factory factory = + new DeletionVectorsMaintainer.Factory(fileHandler); DeletionVectorsMaintainer dvMaintainer = factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 6e718a0855c6..8e8f315c95d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -447,7 +447,8 @@ private MergeTreeCompactManager createCompactManager( options.compactionFileSize(), options.numSortedRunStopTrigger(), new TestRewriter(), - null); + null, + false); } static class MockFailResultCompactionManager extends MergeTreeCompactManager { @@ -467,7 +468,8 @@ public MockFailResultCompactionManager( minFileSize, numSortedRunStopTrigger, rewriter, - null); + null, + false); } protected CompactResult obtainCompactResult() diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index 7564ca07def2..4c377af17617 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalRow.FieldGetter; +import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg; @@ -69,7 +70,9 @@ public void testDeduplicate(boolean changelogRowDeduplicate) { RowType.of(DataTypes.INT())), highLevel::get, EQUALISER, - changelogRowDeduplicate); + changelogRowDeduplicate, + LookupStrategy.CHANGELOG_ONLY, + null); // Without level-0 function.reset(); @@ -225,7 +228,9 @@ public void testSum(boolean changelogRowDeduplicate) { RowType.of(DataTypes.INT())), key -> null, EQUALISER, - changelogRowDeduplicate); + changelogRowDeduplicate, + LookupStrategy.CHANGELOG_ONLY, + null); // Without level-0 function.reset(); @@ -384,7 +389,9 @@ public void testPartialUpdateIgnoreDelete() { RowType.of(DataTypes.INT())), key -> null, EQUALISER, - false); + false, + LookupStrategy.CHANGELOG_ONLY, + null); function.reset(); function.add(new KeyValue().replace(row(1), 1, DELETE, row(1)).setLevel(2)); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java index 338013d32909..d5aee3ccc727 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java @@ -205,7 +205,8 @@ private void innerTest( 2, Integer.MAX_VALUE, new TestRewriter(expectedDropDelete), - null); + null, + false); manager.triggerCompaction(false); manager.getCompactionResult(true); List outputs = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 5f0c4d66252a..40cd90e3b439 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -54,6 +54,7 @@ import java.util.UUID; import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT; +import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT; @@ -101,9 +102,11 @@ private StoreSinkWrite.Provider createWriteProvider( } else { Options options = table.coreOptions().toConfiguration(); ChangelogProducer changelogProducer = table.coreOptions().changelogProducer(); + // todo: deletion vectors support lookup wait waitCompaction = - changelogProducer == ChangelogProducer.LOOKUP - && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT); + (changelogProducer == ChangelogProducer.LOOKUP + && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT)) + || options.get(DELETION_VECTORS_ENABLED); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 46a7931b930e..dbb220bfdd4f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT; @@ -240,10 +241,11 @@ private StoreSinkWrite.Provider createWriteProvider( Options options = fileStoreTable.coreOptions().toConfiguration(); CoreOptions.ChangelogProducer changelogProducer = fileStoreTable.coreOptions().changelogProducer(); + // todo: deletion vectors support lookup wait waitCompaction = - changelogProducer == CoreOptions.ChangelogProducer.LOOKUP - && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT); - + (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP + && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT)) + || options.get(DELETION_VECTORS_ENABLED); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 713bcc34136e..8872cc333962 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -338,7 +338,22 @@ public void testStandAloneLookupJobRandom() throws Exception { private void testNoChangelogProducerRandom( TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception { - List results = testRandom(tEnv, numProducers, enableFailure, "'bucket' = '4'"); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean enableDeletionVectors = random.nextBoolean(); + if (enableDeletionVectors) { + // Deletion vectors mode not support concurrent write + numProducers = 1; + } + List results = + testRandom( + tEnv, + numProducers, + enableFailure, + "'bucket' = '4'," + + String.format( + "'deletion-vectors.enabled' = '%s'", + enableDeletionVectors)); + for (TableResult result : results) { result.await(); } @@ -370,7 +385,11 @@ private void testFullCompactionChangelogProducerRandom( private void testLookupChangelogProducerRandom( TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - + boolean enableDeletionVectors = random.nextBoolean(); + if (enableDeletionVectors) { + // Deletion vectors mode not support concurrent write + numProducers = 1; + } testRandom( tEnv, numProducers, @@ -381,7 +400,9 @@ private void testLookupChangelogProducerRandom( random.nextBoolean() ? "512kb" : "1mb") + "'changelog-producer' = 'lookup'," + String.format( - "'changelog-producer.lookup-wait' = '%s'", random.nextBoolean())); + "'changelog-producer.lookup-wait' = '%s',", random.nextBoolean()) + + String.format( + "'deletion-vectors.enabled' = '%s'", enableDeletionVectors)); // sleep for a random amount of time to check // if we can first read complete records then read incremental records correctly diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 745e92b680fd..244449f9baa1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -138,7 +138,9 @@ private TableRead createRead( ignore -> avro, pathFactory, EXTRACTOR, - new CoreOptions(new HashMap<>()))); + new CoreOptions(new HashMap<>())), + new CoreOptions(new HashMap<>()), + null); return new KeyValueTableRead(read, null) { @Override @@ -193,6 +195,7 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc snapshotManager, null, // not used, we only create an empty writer null, + null, options, EXTRACTOR, tablePath.getName()) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala new file mode 100644 index 000000000000..887b6c8dfe61 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorsMaintainer} +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions + +import scala.util.Random + +class DeletionVectorTest extends PaimonSparkTestBase { + + test("Paimon deletionVector: deletion vector write verification") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '1', + | 'primary-key' = 'id', + | 'file.format' = 'parquet', + | 'deletion-vectors.enabled' = 'true' + |) + |""".stripMargin) + val table = loadTable("T") + + // Insert1 + // f1 (1, 2, 3), row with positions 0 and 2 in f1 are marked deleted + // f2 (1, 3) + spark.sql("INSERT INTO T VALUES (1, 'aaaaaaaaaaaaaaaaaaa'), (2, 'b'), (3, 'c')") + spark.sql("INSERT INTO T VALUES (1, 'a_new1'), (3, 'c_new1')") + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil) + + val dvMaintainerFactory = + new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + + def restoreDeletionVectors(): java.util.Map[String, DeletionVector] = { + dvMaintainerFactory + .createOrRestore(table.snapshotManager().latestSnapshotId(), BinaryRow.EMPTY_ROW, 0) + .deletionVectors() + } + + val deletionVectors1 = restoreDeletionVectors() + // 1, 3 deleted, their row positions are 0, 2 + Assertions.assertEquals(1, deletionVectors1.size()) + deletionVectors1 + .entrySet() + .forEach( + e => { + Assertions.assertTrue(e.getValue.isDeleted(0)) + Assertions.assertTrue(e.getValue.isDeleted(2)) + }) + + // Compact + // f3 (1, 2, 3), no deletion + spark.sql("CALL sys.compact('T')") + val deletionVectors2 = restoreDeletionVectors() + // After compaction, deletionVectors should be empty + Assertions.assertTrue(deletionVectors2.isEmpty) + + // Insert2 + // f3 (1, 2, 3), row with position 1 in f3 is marked deleted + // f4 (2) + spark.sql("INSERT INTO T VALUES (2, 'b_new2')") + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil) + + val deletionVectors3 = restoreDeletionVectors() + // 2 deleted, row positions is 1 + Assertions.assertEquals(1, deletionVectors3.size()) + deletionVectors3 + .entrySet() + .forEach( + e => { + Assertions.assertTrue(e.getValue.isDeleted(1)) + }) + } + } + + test("Paimon deletionVector: e2e random write") { + val bucket = Random.shuffle(Seq("-1", "1", "3")).head + val changelogProducer = Random.shuffle(Seq("none", "lookup")).head + val format = Random.shuffle(Seq("orc", "parquet")).head + val batchSize = Random.nextInt(1024) + 1 + + val dvTbl = "deletion_vector_tbl" + val resultTbl = "result_tbl" + spark.sql(s"drop table if exists $dvTbl") + spark.sql(s""" + |CREATE TABLE $dvTbl (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'id, pt', + | 'deletion-vectors.enabled' = 'true', + | 'bucket' = '$bucket', + | 'changelog-producer' = '$changelogProducer', + | 'file.format' = '$format', + | 'read.batch-size' = '$batchSize' + |) + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"drop table if exists $resultTbl") + spark.sql(s""" + |CREATE TABLE $resultTbl (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'id, pt', + | 'deletion-vectors.enabled' = 'false' + |) + |PARTITIONED BY (pt) + |""".stripMargin) + + def insert(t1: String, t2: String, count: Int): Unit = { + val ids = (1 to count).map(_ => Random.nextInt(10000)) + val names = (1 to count).map(_ => (Random.nextInt(26) + 'a'.toInt).toChar.toString) + val pts = (1 to count).map(_ => s"p${Random.nextInt(3)}") + val values = ids + .zip(names) + .zip(pts) + .map { case ((id, name), pt) => s"($id, '$name', '$pt')" } + .mkString(", ") + spark.sql(s"INSERT INTO $t1 VALUES $values") + spark.sql(s"INSERT INTO $t2 VALUES $values") + } + + def delete(t1: String, t2: String, count: Int): Unit = { + val ids = (1 to count).map(_ => Random.nextInt(10000)).toList + val idsString = ids.mkString(", ") + spark.sql(s"DELETE FROM $t1 WHERE id IN ($idsString)") + spark.sql(s"DELETE FROM $t2 WHERE id IN ($idsString)") + } + + def update(t1: String, t2: String, count: Int): Unit = { + val ids = (1 to count).map(_ => Random.nextInt(10000)).toList + val idsString = ids.mkString(", ") + val randomName = (Random.nextInt(26) + 'a'.toInt).toChar.toString + spark.sql(s"UPDATE $t1 SET name = '$randomName' WHERE id IN ($idsString)") + spark.sql(s"UPDATE $t2 SET name = '$randomName' WHERE id IN ($idsString)") + } + + def checkResult(t1: String, t2: String): Unit = { + try { + checkAnswer( + spark.sql(s"SELECT * FROM $t1 ORDER BY id, pt"), + spark.sql(s"SELECT * FROM $t2 ORDER BY id, pt")) + } catch { + case e: Throwable => + println(s"test error, table params: ${loadTable(dvTbl).options()}") + throw new RuntimeException(e) + } + } + + val operations = Seq( + () => insert(dvTbl, resultTbl, 1000), + () => update(dvTbl, resultTbl, 100), + () => delete(dvTbl, resultTbl, 100) + ) + + // Insert first + operations.head() + checkResult(dvTbl, resultTbl) + + for (_ <- 1 to 20) { + // Randomly select an operation + operations(Random.nextInt(operations.size))() + checkResult(dvTbl, resultTbl) + } + } +}