From ef1683308d8622980cb26f190c7ef73268d86374 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 7 Feb 2024 15:53:57 +0800 Subject: [PATCH] Step4: support read with delete map --- .../org/apache/paimon/KeyValueFileStore.java | 7 +- .../index/delete/ApplyDeleteIndexReader.java | 75 +++++++++++++ .../paimon/io/KeyValueFileReaderFactory.java | 60 +++++++---- .../operation/KeyValueFileStoreRead.java | 101 ++++++++++++++---- .../operation/KeyValueFileStoreWrite.java | 5 +- .../paimon/table/query/LocalTableQuery.java | 3 +- .../paimon/io/KeyValueFileReadWriteTest.java | 2 +- .../paimon/mergetree/ContainsLevelsTest.java | 2 +- .../paimon/mergetree/LookupLevelsTest.java | 2 +- .../paimon/mergetree/MergeTreeTestBase.java | 4 +- 10 files changed, 215 insertions(+), 46 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/index/delete/ApplyDeleteIndexReader.java diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 57315a3a5900c..cda4c861d4cbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -115,6 +115,10 @@ public KeyValueFileStoreScan newScan() { @Override public KeyValueFileStoreRead newRead() { + IndexMaintainer.Factory deleteMapFactory = null; + if (options.deleteMapEnabled()) { + deleteMapFactory = new DeleteMapIndexMaintainer.Factory(newIndexFileHandler()); + } return new KeyValueFileStoreRead( schemaManager, schemaId, @@ -122,7 +126,8 @@ public KeyValueFileStoreRead newRead() { valueType, newKeyComparator(), mfFactory, - newReaderFactoryBuilder()); + newReaderFactoryBuilder(), + deleteMapFactory); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { diff --git a/paimon-core/src/main/java/org/apache/paimon/index/delete/ApplyDeleteIndexReader.java b/paimon-core/src/main/java/org/apache/paimon/index/delete/ApplyDeleteIndexReader.java new file mode 100644 index 0000000000000..8cd635190cc87 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/delete/ApplyDeleteIndexReader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index.delete; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.reader.RecordReader; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** 1. */ +public class ApplyDeleteIndexReader implements RecordReader { + + private final RecordReader reader; + + private final DeleteIndex deleteIndex; + + public ApplyDeleteIndexReader(RecordReader reader, DeleteIndex deleteIndex) { + this.reader = reader; + this.deleteIndex = deleteIndex; + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + RecordIterator batch = reader.readBatch(); + + if (batch == null) { + return null; + } + + return new RecordIterator() { + @Override + public KeyValue next() throws IOException { + while (true) { + KeyValue kv = batch.next(); + if (kv == null) { + return null; + } + + if (!deleteIndex.isDeleted(kv.position())) { + return kv; + } + } + } + + @Override + public void releaseBatch() { + batch.releaseBatch(); + } + }; + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 0c8270a442a65..6f6f9135a94c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -24,6 +24,9 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.ApplyDeleteIndexReader; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; @@ -42,6 +45,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; import java.util.function.Supplier; /** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */ @@ -60,6 +65,7 @@ public class KeyValueFileReaderFactory { private final Map bulkFormatMappings; private final BinaryRow partition; private final boolean deleteMapEnabled; + private final Function> fileNameToDeleteIndex; private KeyValueFileReaderFactory( FileIO fileIO, @@ -71,7 +77,8 @@ private KeyValueFileReaderFactory( DataFilePathFactory pathFactory, long asyncThreshold, BinaryRow partition, - boolean deleteMapEnabled) { + boolean deleteMapEnabled, + Function> fileNameToDeleteIndex) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schemaId = schemaId; @@ -83,6 +90,7 @@ private KeyValueFileReaderFactory( this.partition = partition; this.bulkFormatMappings = new HashMap<>(); this.deleteMapEnabled = deleteMapEnabled; + this.fileNameToDeleteIndex = fileNameToDeleteIndex; } public RecordReader createRecordReader( @@ -117,18 +125,26 @@ private RecordReader createRecordReader( new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - return new KeyValueDataFileRecordReader( - fileIO, - bulkFormatMapping.getReaderFactory(), - pathFactory.toPath(fileName), - keyType, - valueType, - level, - poolSize, - bulkFormatMapping.getIndexMapping(), - bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), - deleteMapEnabled); + + RecordReader recordReader = + new KeyValueDataFileRecordReader( + fileIO, + bulkFormatMapping.getReaderFactory(), + pathFactory.toPath(fileName), + keyType, + valueType, + level, + poolSize, + bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getCastMapping(), + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + deleteMapEnabled); + + Optional deleteIndexOptional = fileNameToDeleteIndex.apply(fileName); + if (deleteIndexOptional.isPresent()) { + recordReader = new ApplyDeleteIndexReader(recordReader, deleteIndexOptional.get()); + } + return recordReader; } public static Builder builder( @@ -231,18 +247,25 @@ public RowType projectedValueType() { return projectedValueType; } - public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) { - return build(partition, bucket, true, Collections.emptyList()); + public KeyValueFileReaderFactory build( + BinaryRow partition, + int bucket, + @Nullable IndexMaintainer deleteMapMaintainer) { + return build(partition, bucket, true, Collections.emptyList(), deleteMapMaintainer); } public KeyValueFileReaderFactory build( BinaryRow partition, int bucket, boolean projectKeys, - @Nullable List filters) { + @Nullable List filters, + @Nullable IndexMaintainer deleteMapMaintainer) { int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection; RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType; - + Function> fileNameToDeleteIndex = + deleteMapMaintainer == null + ? (fileName) -> Optional.empty() + : deleteMapMaintainer::indexOf; return new KeyValueFileReaderFactory( fileIO, schemaManager, @@ -254,7 +277,8 @@ public KeyValueFileReaderFactory build( pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), partition, - options.deleteMapEnabled()); + options.deleteMapEnabled(), + fileNameToDeleteIndex); } private void applyProjection() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index 09b9cb81b633e..412c0dd98d49b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -21,11 +21,14 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.DropDeleteReader; @@ -61,6 +64,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX; +import static org.apache.paimon.mergetree.MergeTreeReaders.readerForRun; import static org.apache.paimon.predicate.PredicateBuilder.containsFields; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; @@ -81,9 +85,11 @@ public class KeyValueFileStoreRead implements FileStoreRead { @Nullable private int[][] pushdownProjection; @Nullable private int[][] outerProjection; + @Nullable private final IndexMaintainer.Factory deleteIndexFactory; private boolean forceKeepDelete = false; + @VisibleForTesting public KeyValueFileStoreRead( FileIO fileIO, SchemaManager schemaManager, @@ -112,7 +118,8 @@ public KeyValueFileStoreRead( formatDiscover, pathFactory, extractor, - options)); + options), + null); } public KeyValueFileStoreRead( @@ -122,7 +129,8 @@ public KeyValueFileStoreRead( RowType valueType, Comparator keyComparator, MergeFunctionFactory mfFactory, - KeyValueFileReaderFactory.Builder readerFactoryBuilder) { + KeyValueFileReaderFactory.Builder readerFactoryBuilder, + @Nullable IndexMaintainer.Factory deleteIndexFactory) { this.tableSchema = schemaManager.schema(schemaId); this.readerFactoryBuilder = readerFactoryBuilder; this.keyComparator = keyComparator; @@ -130,6 +138,7 @@ public KeyValueFileStoreRead( this.mergeSorter = new MergeSorter( CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null); + this.deleteIndexFactory = deleteIndexFactory; } public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) { @@ -209,7 +218,12 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli if (split.isStreaming()) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build( - split.partition(), split.bucket(), true, filtersForOverlappedSection); + split.partition(), + split.bucket(), + true, + filtersForOverlappedSection, + getIndexMaintainer( + split.snapshotId(), split.partition(), split.bucket())); ReaderSupplier beforeSupplier = () -> new ReverseReader(streamingConcat(split.beforeFiles(), readerFactory)); ReaderSupplier dataSupplier = @@ -218,14 +232,28 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli ? dataSupplier.get() : ConcatRecordReader.create(Arrays.asList(beforeSupplier, dataSupplier)); } else { + // todo: for incremental query with deleteMap mode, we can just filter out the compacted + // files without needing to merge the diff return split.beforeFiles().isEmpty() ? batchMergeRead( - split.partition(), split.bucket(), split.dataFiles(), forceKeepDelete) + split.partition(), + split.bucket(), + split.dataFiles(), + forceKeepDelete, + split.snapshotId()) : DiffReader.readDiff( batchMergeRead( - split.partition(), split.bucket(), split.beforeFiles(), false), + split.partition(), + split.bucket(), + split.beforeFiles(), + false, + split.snapshotId()), batchMergeRead( - split.partition(), split.bucket(), split.dataFiles(), false), + split.partition(), + split.bucket(), + split.dataFiles(), + false, + split.snapshotId()), keyComparator, mergeSorter, forceKeepDelete); @@ -233,30 +261,55 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli } private RecordReader batchMergeRead( - BinaryRow partition, int bucket, List files, boolean keepDelete) + BinaryRow partition, + int bucket, + List files, + boolean keepDelete, + long snapshotId) throws IOException { // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForOverlappedSection); + readerFactoryBuilder.build( + partition, + bucket, + false, + filtersForOverlappedSection, + getIndexMaintainer(snapshotId, partition, bucket)); KeyValueFileReaderFactory nonOverlappedSectionFactory = readerFactoryBuilder.build( - partition, bucket, false, filtersForNonOverlappedSection); + partition, + bucket, + false, + filtersForNonOverlappedSection, + getIndexMaintainer(snapshotId, partition, bucket)); List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection)); for (List section : new IntervalPartition(files, keyComparator).partition()) { - sectionReaders.add( - () -> - MergeTreeReaders.readerForSection( - section, - section.size() > 1 - ? overlappedSectionFactory - : nonOverlappedSectionFactory, - keyComparator, - mergeFuncWrapper, - mergeSorter)); + // todo: maybe we can add a new SortEngine which just concat record with merger + if (deleteIndexFactory != null) { + sectionReaders.add( + () -> { + List> readers = new ArrayList<>(); + for (SortedRun run : section) { + readers.add(() -> readerForRun(run, nonOverlappedSectionFactory)); + } + return ConcatRecordReader.create(readers); + }); + } else { + sectionReaders.add( + () -> + MergeTreeReaders.readerForSection( + section, + section.size() > 1 + ? overlappedSectionFactory + : nonOverlappedSectionFactory, + keyComparator, + mergeFuncWrapper, + mergeSorter)); + } } RecordReader reader = ConcatRecordReader.create(sectionReaders); @@ -298,4 +351,14 @@ private RecordReader projectKey( ProjectedRow projectedRow = ProjectedRow.from(keyProjectedFields); return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key()))); } + + // todo: add cache + private @Nullable IndexMaintainer getIndexMaintainer( + long snapshotId, BinaryRow partition, int bucket) { + if (deleteIndexFactory != null) { + return deleteIndexFactory.createOrRestore(snapshotId, partition, bucket); + } else { + return null; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 5a25480178faf..dd30a595f42b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -236,7 +236,8 @@ private MergeTreeCompactRewriter createRewriter( Comparator keyComparator, Levels levels, @Nullable IndexMaintainer deleteMapMaintainer) { - KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); + KeyValueFileReaderFactory readerFactory = + readerFactoryBuilder.build(partition, bucket, deleteMapMaintainer); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); @@ -265,7 +266,7 @@ private MergeTreeCompactRewriter createRewriter( readerFactoryBuilder .copyWithoutProjection() .withValueProjection(new int[0][]) - .build(partition, bucket); + .build(partition, bucket, deleteMapMaintainer); ContainsLevels containsLevels = createContainsLevels(levels, keyOnlyReader); return new FirstRowMergeTreeCompactRewriter( maxLevel, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 3e64d8a00607a..e69a96ca5db6d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -124,7 +124,8 @@ public void refreshFiles( private void newLookupLevels(BinaryRow partition, int bucket, List dataFiles) { Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels()); - KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket); + // todo: support delete map mode + KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket, null); Options options = this.options.toConfiguration(); LookupLevels lookupLevels = new LookupLevels( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index aab4a5f98d129..e7736b7e130f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -293,7 +293,7 @@ private KeyValueFileReaderFactory createReaderFactory( if (valueProjection != null) { builder.withValueProjection(valueProjection); } - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, null); } private void assertData( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index e4d4e83b074e1..c402ad9830603 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -246,7 +246,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, null); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 5f1f3e17b10cc..c4ba3247c2c46 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -287,7 +287,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, null); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 34c0eb4d9283a..7fc4ab9fa7283 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -170,8 +170,8 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); - compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); + readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, null); + compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, null); Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put(identifier, pathFactory);