From 3a64148cf471abf7ab350bdfd621f351f0c62151 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 16 May 2024 17:03:12 +0800 Subject: [PATCH] [core] Incremental-between tags should deduplicate records (#3338) --- .../apache/paimon/mergetree/MergeSorter.java | 4 + .../paimon/operation/MergeFileSplitRead.java | 109 +++++++----------- .../paimon/operation/RawFileSplitRead.java | 12 ++ .../apache/paimon/operation/SplitRead.java | 46 +++++++- .../table/source/KeyValueTableRead.java | 79 ++++++------- .../IncrementalChangelogReadProvider.java | 96 +++++++++++++++ .../IncrementalDiffReadProvider.java | 65 +++++++++++ .../splitread/IncrementalDiffSplitRead.java} | 95 +++++++++++++-- .../splitread/MergeFileSplitReadProvider.java | 68 +++++++++++ .../splitread/RawFileSplitReadProvider.java | 60 ++++++++++ .../source/splitread/SplitReadProvider.java | 33 ++++++ .../operation/MergeFileSplitReadTest.java | 2 +- .../paimon/table/IncrementalTableTest.java | 18 ++- 13 files changed, 566 insertions(+), 121 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java rename paimon-core/src/main/java/org/apache/paimon/{operation/DiffReader.java => table/source/splitread/IncrementalDiffSplitRead.java} (55%) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index 0f54b40b681a..420613899ad8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -95,6 +95,10 @@ public MemorySegmentPool memoryPool() { return memoryPool; } + public RowType valueType() { + return valueType; + } + public void setIOManager(IOManager ioManager) { this.ioManager = ioManager; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index d457c9093650..8002b62f0f01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -116,7 +116,16 @@ public MergeFileSplitRead withKeyProjection(@Nullable int[][] projectedFields) { return this; } - public MergeFileSplitRead withValueProjection(@Nullable int[][] projectedFields) { + public Comparator keyComparator() { + return keyComparator; + } + + public MergeSorter mergeSorter() { + return mergeSorter; + } + + @Override + public MergeFileSplitRead withProjection(@Nullable int[][] projectedFields) { if (projectedFields == null) { return this; } @@ -160,11 +169,13 @@ public MergeFileSplitRead withValueProjection(@Nullable int[][] projectedFields) return this; } + @Override public MergeFileSplitRead withIOManager(IOManager ioManager) { this.mergeSorter.setIOManager(ioManager); return this; } + @Override public MergeFileSplitRead forceKeepDelete() { this.forceKeepDelete = true; return this; @@ -211,75 +222,28 @@ public MergeFileSplitRead withFilter(Predicate predicate) { @Override public RecordReader createReader(DataSplit split) throws IOException { - RecordReader reader = createReaderWithoutOuterProjection(split); - if (outerProjection != null) { - ProjectedRow projectedRow = ProjectedRow.from(outerProjection); - reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + if (!split.beforeFiles().isEmpty()) { + throw new IllegalArgumentException("This read cannot accept split with before files."); } - return reader; - } - private RecordReader createReaderWithoutOuterProjection(DataSplit split) - throws IOException { - if (split.beforeFiles().isEmpty()) { - if (split.isStreaming() || split.convertToRawFiles().isPresent()) { - return noMergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - split.isStreaming()); - } else { - return projectKey( - mergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - null, - forceKeepDelete)); - } - } else if (split.isStreaming()) { - // streaming concat read - return ConcatRecordReader.create( - () -> - new ReverseReader( - noMergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - split.beforeDeletionFiles().orElse(null), - true)), - () -> - noMergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - true)); + if (split.isStreaming()) { + return createNoMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + split.isStreaming()); } else { - // batch diff read - return projectKey( - DiffReader.readDiff( - mergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - split.beforeDeletionFiles().orElse(null), - false), - mergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - false), - keyComparator, - createUdsComparator(), - mergeSorter, - forceKeepDelete)); + return createMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + forceKeepDelete); } } - private RecordReader mergeRead( + public RecordReader createMergeReader( BinaryRow partition, int bucket, List files, @@ -316,10 +280,10 @@ private RecordReader mergeRead( reader = new DropDeleteReader(reader); } - return reader; + return projectOuter(projectKey(reader)); } - private RecordReader noMergeRead( + public RecordReader createNoMergeReader( BinaryRow partition, int bucket, List files, @@ -344,7 +308,8 @@ private RecordReader noMergeRead( file.schemaId(), fileName, file.fileSize(), file.level()); }); } - return ConcatRecordReader.create(suppliers); + + return projectOuter(ConcatRecordReader.create(suppliers)); } private Optional changelogFile(DataFileMeta fileMeta) { @@ -365,8 +330,16 @@ private RecordReader projectKey(RecordReader reader) { return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key()))); } + private RecordReader projectOuter(RecordReader reader) { + if (outerProjection != null) { + ProjectedRow projectedRow = ProjectedRow.from(outerProjection); + reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + } + return reader; + } + @Nullable - private UserDefinedSeqComparator createUdsComparator() { + public UserDefinedSeqComparator createUdsComparator() { return UserDefinedSeqComparator.create( readerFactoryBuilder.projectedValueType(), sequenceFields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 5ecc4a4f04ac..efed168afe24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.format.FormatReaderContext; @@ -98,6 +99,17 @@ public RawFileSplitRead( this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes(); } + @Override + public SplitRead forceKeepDelete() { + return this; + } + + @Override + public SplitRead withIOManager(@Nullable IOManager ioManager) { + return this; + } + + @Override public RawFileSplitRead withProjection(int[][] projectedFields) { if (projectedFields != null) { projection = projectedFields; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java index a5b05e8f4a4c..c17c0f6b3d88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java @@ -18,9 +18,13 @@ package org.apache.paimon.operation; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.IOFunction; + +import javax.annotation.Nullable; import java.io.IOException; @@ -31,8 +35,48 @@ */ public interface SplitRead { - SplitRead withFilter(Predicate predicate); + SplitRead forceKeepDelete(); + + SplitRead withIOManager(@Nullable IOManager ioManager); + + SplitRead withProjection(@Nullable int[][] projectedFields); + + SplitRead withFilter(@Nullable Predicate predicate); /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; + + static SplitRead convert( + SplitRead read, IOFunction> convertedFactory) { + return new SplitRead() { + @Override + public SplitRead forceKeepDelete() { + read.forceKeepDelete(); + return this; + } + + @Override + public SplitRead withIOManager(@Nullable IOManager ioManager) { + read.withIOManager(ioManager); + return this; + } + + @Override + public SplitRead withProjection(@Nullable int[][] projectedFields) { + read.withProjection(projectedFields); + return this; + } + + @Override + public SplitRead withFilter(@Nullable Predicate predicate) { + read.withFilter(predicate); + return this; + } + + @Override + public RecordReader createReader(DataSplit split) throws IOException { + return convertedFactory.apply(split); + } + }; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index f3f66c4aaf23..c674e4792d43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -23,14 +23,22 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.utils.LazyField; +import org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider; +import org.apache.paimon.table.source.splitread.IncrementalDiffReadProvider; +import org.apache.paimon.table.source.splitread.MergeFileSplitReadProvider; +import org.apache.paimon.table.source.splitread.RawFileSplitReadProvider; +import org.apache.paimon.table.source.splitread.SplitReadProvider; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.function.Supplier; /** @@ -38,8 +46,7 @@ */ public final class KeyValueTableRead extends AbstractDataTableRead { - private final LazyField mergeRead; - private final LazyField batchRawRead; + private final List readProviders; private int[][] projection = null; private boolean forceKeepDelete = false; @@ -51,65 +58,54 @@ public KeyValueTableRead( Supplier batchRawReadSupplier, TableSchema schema) { super(schema); - this.mergeRead = new LazyField<>(() -> createMergeRead(mergeReadSupplier)); - this.batchRawRead = new LazyField<>(() -> createBatchRawRead(batchRawReadSupplier)); + this.readProviders = + Arrays.asList( + new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues), + new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues), + new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues), + new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues)); } - private MergeFileSplitRead createMergeRead(Supplier readSupplier) { - MergeFileSplitRead read = - readSupplier - .get() - .withKeyProjection(new int[0][]) - .withValueProjection(projection) - .withFilter(predicate) - .withIOManager(ioManager); - if (forceKeepDelete) { - read = read.forceKeepDelete(); + private List> initialized() { + List> readers = new ArrayList<>(); + for (SplitReadProvider readProvider : readProviders) { + if (readProvider.initialized()) { + readers.add(readProvider.getOrCreate()); + } } - return read; + return readers; } - private RawFileSplitRead createBatchRawRead(Supplier readSupplier) { - return readSupplier.get().withProjection(projection).withFilter(predicate); + private void assignValues(SplitRead read) { + if (forceKeepDelete) { + read = read.forceKeepDelete(); + } + read.withProjection(projection).withFilter(predicate).withIOManager(ioManager); } @Override public void projection(int[][] projection) { - if (mergeRead.initialized()) { - mergeRead.get().withValueProjection(projection); - } - if (batchRawRead.initialized()) { - batchRawRead.get().withProjection(projection); - } + initialized().forEach(r -> r.withProjection(projection)); this.projection = projection; } @Override public InnerTableRead forceKeepDelete() { - if (mergeRead.initialized()) { - mergeRead.get().forceKeepDelete(); - } + initialized().forEach(SplitRead::forceKeepDelete); this.forceKeepDelete = true; return this; } @Override protected InnerTableRead innerWithFilter(Predicate predicate) { - if (mergeRead.initialized()) { - mergeRead.get().withFilter(predicate); - } - if (batchRawRead.initialized()) { - batchRawRead.get().withFilter(predicate); - } + initialized().forEach(r -> r.withFilter(predicate)); this.predicate = predicate; return this; } @Override public TableRead withIOManager(IOManager ioManager) { - if (mergeRead.initialized()) { - mergeRead.get().withIOManager(ioManager); - } + initialized().forEach(r -> r.withIOManager(ioManager)); this.ioManager = ioManager; return this; } @@ -117,11 +113,16 @@ public TableRead withIOManager(IOManager ioManager) { @Override public RecordReader reader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; - if (!forceKeepDelete && !dataSplit.isStreaming() && split.convertToRawFiles().isPresent()) { - return batchRawRead.get().createReader(dataSplit); + for (SplitReadProvider readProvider : readProviders) { + if (readProvider.match(dataSplit, forceKeepDelete)) { + return readProvider.getOrCreate().createReader(dataSplit); + } } - RecordReader reader = mergeRead.get().createReader(dataSplit); + throw new RuntimeException("Should not happen."); + } + + public static RecordReader unwrap(RecordReader reader) { return new RecordReader() { @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java new file mode 100644 index 000000000000..bec95979d639 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.ReverseReader; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.IOFunction; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; + +/** A {@link SplitReadProvider} to incremental changelog read. */ +public class IncrementalChangelogReadProvider implements SplitReadProvider { + + private final LazyField> splitRead; + + public IncrementalChangelogReadProvider( + Supplier supplier, + Consumer> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead create(Supplier supplier) { + final MergeFileSplitRead read = supplier.get().withKeyProjection(new int[0][]); + IOFunction> convertedFactory = + split -> { + RecordReader reader = + ConcatRecordReader.create( + () -> + new ReverseReader( + read.createNoMergeReader( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles() + .orElse(null), + true)), + () -> + read.createNoMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + true)); + return unwrap(reader); + }; + + return SplitRead.convert(read, convertedFactory); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !split.beforeFiles().isEmpty() && split.isStreaming(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java new file mode 100644 index 000000000000..a335a7c03054 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** A {@link SplitReadProvider} to batch incremental diff read. */ +public class IncrementalDiffReadProvider implements SplitReadProvider { + + private final LazyField> splitRead; + + public IncrementalDiffReadProvider( + Supplier supplier, + Consumer> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead create(Supplier supplier) { + return new IncrementalDiffSplitRead(supplier.get()); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !split.beforeFiles().isEmpty() && !split.isStreaming(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java similarity index 55% rename from paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java rename to paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index bc5153600cde..0519da9fd19e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -16,15 +16,24 @@ * limitations under the License. */ -package org.apache.paimon.operation; +package org.apache.paimon.table.source.splitread; import org.apache.paimon.KeyValue; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.FieldsComparator; +import org.apache.paimon.utils.ProjectedRow; import javax.annotation.Nullable; @@ -34,13 +43,73 @@ import java.util.Comparator; import java.util.List; -/** A {@link RecordReader} util to read diff between before reader and after reader. */ -public class DiffReader { +/** A {@link SplitRead} for batch incremental diff. */ +public class IncrementalDiffSplitRead implements SplitRead { private static final int BEFORE_LEVEL = Integer.MIN_VALUE; private static final int AFTER_LEVEL = Integer.MAX_VALUE; - public static RecordReader readDiff( + private final MergeFileSplitRead mergeRead; + + private boolean forceKeepDelete = false; + @Nullable private int[][] projectedFields; + + public IncrementalDiffSplitRead(MergeFileSplitRead mergeRead) { + this.mergeRead = mergeRead; + } + + @Override + public SplitRead forceKeepDelete() { + this.forceKeepDelete = true; + return this; + } + + @Override + public SplitRead withIOManager(@Nullable IOManager ioManager) { + mergeRead.withIOManager(ioManager); + return this; + } + + @Override + public SplitRead withProjection(@Nullable int[][] projectedFields) { + this.projectedFields = projectedFields; + return this; + } + + @Override + public SplitRead withFilter(@Nullable Predicate predicate) { + mergeRead.withFilter(predicate); + return this; + } + + @Override + public RecordReader createReader(DataSplit split) throws IOException { + RecordReader reader = + readDiff( + mergeRead.createMergeReader( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles().orElse(null), + false), + mergeRead.createMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + false), + mergeRead.keyComparator(), + mergeRead.createUdsComparator(), + mergeRead.mergeSorter(), + forceKeepDelete); + if (projectedFields != null) { + ProjectedRow projectedRow = ProjectedRow.from(projectedFields); + reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + } + return KeyValueTableRead.unwrap(reader); + } + + private static RecordReader readDiff( RecordReader beforeReader, RecordReader afterReader, Comparator keyComparator, @@ -54,7 +123,7 @@ public static RecordReader readDiff( () -> wrapLevelToReader(afterReader, AFTER_LEVEL)), keyComparator, userDefinedSeqComparator, - new DiffMerger(keepDelete)); + new DiffMerger(keepDelete, InternalSerializers.create(sorter.valueType()))); } private static RecordReader wrapLevelToReader( @@ -96,11 +165,15 @@ public void close() throws IOException { private static class DiffMerger implements MergeFunctionWrapper { private final boolean keepDelete; + private final InternalRowSerializer serializer1; + private final InternalRowSerializer serializer2; private final List kvs = new ArrayList<>(); - public DiffMerger(boolean keepDelete) { + public DiffMerger(boolean keepDelete, InternalRowSerializer serializer) { this.keepDelete = keepDelete; + this.serializer1 = serializer; + this.serializer2 = serializer.duplicate(); } @Override @@ -128,7 +201,9 @@ public KeyValue getResult() { } else if (kvs.size() == 2) { KeyValue latest = kvs.get(1); if (latest.level() == AFTER_LEVEL) { - return latest; + if (!valueEquals()) { + return latest; + } } } else { throw new IllegalArgumentException("Illegal kv number: " + kvs.size()); @@ -136,5 +211,11 @@ public KeyValue getResult() { return null; } + + private boolean valueEquals() { + return serializer1 + .toBinaryRow(kvs.get(0).value()) + .equals(serializer2.toBinaryRow(kvs.get(1).value())); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java new file mode 100644 index 000000000000..abed0f33c977 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; + +/** A {@link SplitReadProvider} to merge files. */ +public class MergeFileSplitReadProvider implements SplitReadProvider { + + private final LazyField> splitRead; + + public MergeFileSplitReadProvider( + Supplier supplier, + Consumer> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead create(Supplier supplier) { + final MergeFileSplitRead read = supplier.get().withKeyProjection(new int[0][]); + return SplitRead.convert(read, split -> unwrap(read.createReader(split))); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return split.beforeFiles().isEmpty(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java new file mode 100644 index 000000000000..9959ee555189 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */ +public class RawFileSplitReadProvider implements SplitReadProvider { + + private final LazyField splitRead; + + public RawFileSplitReadProvider( + Supplier supplier, Consumer> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + RawFileSplitRead read = supplier.get(); + valuesAssigner.accept(read); + return read; + }); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !forceKeepDelete && !split.isStreaming() && split.rawConvertible(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java new file mode 100644 index 000000000000..2aaefb322f4d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; + +/** Provider to create {@link SplitRead}. */ +public interface SplitReadProvider { + + boolean match(DataSplit split, boolean forceKeepDelete); + + boolean initialized(); + + SplitRead getOrCreate(); +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 5652fcd43700..1794e8aeaed5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -231,7 +231,7 @@ private List writeThenRead( read.withKeyProjection(keyProjection); } if (valueProjection != null) { - read.withValueProjection(valueProjection); + read.withProjection(valueProjection); } List result = new ArrayList<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index f4dac9d11c90..b4b905d36453 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -225,6 +225,8 @@ public void testTagIncremental() throws Exception { GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1), + GenericRow.of(1, 4, 1), + GenericRow.of(1, 5, 1), GenericRow.of(2, 1, 1)); // snapshot 2: append @@ -235,7 +237,7 @@ public void testTagIncremental() throws Exception { // UPDATE GenericRow.of(1, 2, 2), // NEW - GenericRow.of(1, 4, 1)); + GenericRow.of(1, 6, 1)); // snapshot 3: compact compact(table, row(1), 0); @@ -247,24 +249,30 @@ public void testTagIncremental() throws Exception { // read tag1 tag2 List result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")); assertThat(result) - .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 6, 1)); result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")); assertThat(result) .containsExactlyInAnyOrder( GenericRow.of(fromString("-D"), 1, 1, 1), GenericRow.of(fromString("+I"), 1, 2, 2), - GenericRow.of(fromString("+I"), 1, 4, 1)); + GenericRow.of(fromString("+I"), 1, 6, 1)); // read tag1 tag3 result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result) - .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 6, 1)); + + // read tag1 tag3 auditLog result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result) .containsExactlyInAnyOrder( GenericRow.of(fromString("-D"), 1, 1, 1), GenericRow.of(fromString("+I"), 1, 2, 2), - GenericRow.of(fromString("+I"), 1, 4, 1)); + GenericRow.of(fromString("+I"), 1, 6, 1)); + + // read tag1 tag3 projection + result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); + assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6)); assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1"))) .hasMessageContaining(