From 9119c6a0aafa6ebf69bac42447cfac448ba58790 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Wed, 21 Aug 2024 17:37:42 +0800 Subject: [PATCH] [flink] support computing the changelog generated by compact during read time. This is used when changelog producer is none, but CoreOptions#needLookup is true and the table is used as a dim table. --- .../java/org/apache/paimon/CoreOptions.java | 3 +- .../main/java/org/apache/paimon/Snapshot.java | 32 +++++- .../operation/AbstractFileStoreScan.java | 5 + .../paimon/operation/FileStoreScan.java | 3 + .../table/source/AbstractDataTableScan.java | 1 + .../apache/paimon/table/source/DataSplit.java | 36 ++++++- .../table/source/DataTableStreamScan.java | 3 + .../table/source/KeyValueTableRead.java | 3 + .../snapshot/CompactionFollowUpScanner.java | 49 +++++++++ .../source/snapshot/SnapshotReaderImpl.java | 3 +- .../IncrementalCompactDiffReadProvider.java | 68 ++++++++++++ .../IncrementalCompactDiffSplitRead.java | 77 ++++++++++++++ .../apache/paimon/table/TableTestBase.java | 13 ++- .../flink/lookup/FileStoreLookupFunction.java | 4 +- .../flink/lookup/LookupStreamingReader.java | 14 ++- .../paimon/flink/utils/TableScanUtils.java | 19 ++++ .../paimon/flink/lookup/LookupTableTest.java | 100 ++++++++++++++++++ 17 files changed, 419 insertions(+), 14 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionFollowUpScanner.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffReadProvider.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffSplitRead.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index c8ee9e4aa8812..a326a256911f8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -2338,7 +2338,8 @@ public enum StreamScanMode implements DescribedEnum { COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for traditional bucket table."), COMPACT_APPEND_NO_BUCKET( "compact-append-no-bucket", "Compaction for append table with bucket unaware."), - FILE_MONITOR("file-monitor", "Monitor data file changes."); + FILE_MONITOR("file-monitor", "Monitor data file changes."), + COMPACT_DELTA_MONITOR("compact-delta-monitor", "Monitor delta changes for compaction."); private final String value; private final String description; diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index a16349ce54b81..6811a9c8ced54 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -32,8 +32,10 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * This file is the entrance to all data committed at some specific time point. @@ -411,15 +413,37 @@ public boolean equals(Object o) { public enum CommitKind { /** Changes flushed from the mem table. */ - APPEND, + APPEND((byte) 1), /** Changes by compacting existing data files. */ - COMPACT, + COMPACT((byte) 2), /** Changes that clear up the whole partition and then add new records. */ - OVERWRITE, + OVERWRITE((byte) 3), /** Collect statistics. */ - ANALYZE + ANALYZE((byte) 4); + + private static final Map BYTE_TO_COMMIT_KIND = + Arrays.stream(CommitKind.values()).collect(Collectors.toMap(e -> e.code, e -> e)); + + private final byte code; + + CommitKind(byte code) { + this.code = code; + } + + public byte getCode() { + return code; + } + + public static CommitKind fromByte(byte kind) { + final CommitKind commitKind = BYTE_TO_COMMIT_KIND.get(kind); + if (commitKind == null) { + throw new IllegalArgumentException( + String.format("The given kind %s is not supported.", kind)); + } + return commitKind; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index cfd1a4bf87922..8e8c703189455 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -249,6 +249,11 @@ public ScanMode scanMode() { public List files() { return files; } + + @Override + public Snapshot.CommitKind commitKind() { + return readSnapshot == null ? null : readSnapshot.commitKind(); + } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 744f2fe145109..cb441ae4eab34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -127,6 +127,9 @@ default List files(FileKind kind) { return files().stream().filter(e -> e.kind() == kind).collect(Collectors.toList()); } + /** {@link org.apache.paimon.Snapshot.CommitKind} of the snapshot. */ + Snapshot.CommitKind commitKind(); + /** Return a map group by partition and bucket. */ static Map>> groupByPartFiles( List files) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 37e66ccdc6356..ba7c91893b618 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -118,6 +118,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return new ContinuousCompactorStartingScanner(snapshotManager); case COMPACT_APPEND_NO_BUCKET: case FILE_MONITOR: + case COMPACT_DELTA_MONITOR: return new FullStartingScanner(snapshotManager); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 067bf055c2417..56fde86bee570 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.source; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; @@ -49,7 +50,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 2; + private static final int VERSION = 3; private long snapshotId = 0; private BinaryRow partition; @@ -64,6 +65,7 @@ public class DataSplit implements Split { private boolean isStreaming = false; private boolean rawConvertible; + @Nullable private Snapshot.CommitKind commitKind; public DataSplit() {} @@ -108,6 +110,11 @@ public boolean rawConvertible() { return rawConvertible; } + @Nullable + public Snapshot.CommitKind commitKind() { + return commitKind; + } + public OptionalLong latestFileCreationEpochMillis() { return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max(); } @@ -193,7 +200,8 @@ public boolean equals(Object o) { && Objects.equals(beforeFiles, dataSplit.beforeFiles) && Objects.equals(beforeDeletionFiles, dataSplit.beforeDeletionFiles) && Objects.equals(dataFiles, dataSplit.dataFiles) - && Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles); + && Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles) + && Objects.equals(commitKind, dataSplit.commitKind); } @Override @@ -208,7 +216,8 @@ public int hashCode() { dataFiles, dataDeletionFiles, isStreaming, - rawConvertible); + rawConvertible, + commitKind); } private void writeObject(ObjectOutputStream out) throws IOException { @@ -230,6 +239,7 @@ private void assign(DataSplit other) { this.dataDeletionFiles = other.dataDeletionFiles; this.isStreaming = other.isStreaming; this.rawConvertible = other.rawConvertible; + this.commitKind = other.commitKind; } public void serialize(DataOutputView out) throws IOException { @@ -258,6 +268,9 @@ public void serialize(DataOutputView out) throws IOException { out.writeBoolean(isStreaming); out.writeBoolean(rawConvertible); + + byte code = commitKind == null ? (byte) -1 : commitKind.getCode(); + out.writeByte(code); } public static DataSplit deserialize(DataInputView in) throws IOException { @@ -289,6 +302,7 @@ public static DataSplit deserialize(DataInputView in) throws IOException { boolean isStreaming = in.readBoolean(); boolean rawConvertible = in.readBoolean(); + Snapshot.CommitKind commitKind = deserializeForCommitKind(version, in); DataSplit.Builder builder = builder() @@ -299,7 +313,8 @@ public static DataSplit deserialize(DataInputView in) throws IOException { .withBeforeFiles(beforeFiles) .withDataFiles(dataFiles) .isStreaming(isStreaming) - .rawConvertible(rawConvertible); + .rawConvertible(rawConvertible) + .withCommitKind(commitKind); if (beforeDeletionFiles != null) { builder.withBeforeDeletionFiles(beforeDeletionFiles); @@ -310,12 +325,18 @@ public static DataSplit deserialize(DataInputView in) throws IOException { return builder.build(); } + private static Snapshot.CommitKind deserializeForCommitKind(int version, DataInputView in) + throws IOException { + byte code = version >= 3 ? in.readByte() : -1; + return code == -1 ? null : Snapshot.CommitKind.fromByte(code); + } + private static FunctionWithIOException getFileMetaSerde( int version) { if (version == 1) { DataFileMeta08Serializer serializer = new DataFileMeta08Serializer(); return serializer::deserialize; - } else if (version == 2) { + } else if (version >= 2) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { @@ -387,6 +408,11 @@ public Builder rawConvertible(boolean rawConvertible) { return this; } + public Builder withCommitKind(Snapshot.CommitKind commitKind) { + this.split.commitKind = commitKind; + return this; + } + public DataSplit build() { checkArgument(split.partition != null); checkArgument(split.bucket != -1); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index 11e92f903f359..83beafba7697d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner; import org.apache.paimon.table.source.snapshot.BoundedChecker; import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner; +import org.apache.paimon.table.source.snapshot.CompactionFollowUpScanner; import org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner; import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner; import org.apache.paimon.table.source.snapshot.FollowUpScanner; @@ -208,6 +209,8 @@ private FollowUpScanner createFollowUpScanner() { return new ContinuousAppendAndCompactFollowUpScanner(); case FILE_MONITOR: return new AllDeltaFollowUpScanner(); + case COMPACT_DELTA_MONITOR: + return new CompactionFollowUpScanner(); } CoreOptions.ChangelogProducer changelogProducer = options.changelogProducer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index c674e4792d436..0b8e5c7a6e497 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -28,6 +28,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider; +import org.apache.paimon.table.source.splitread.IncrementalCompactDiffReadProvider; import org.apache.paimon.table.source.splitread.IncrementalDiffReadProvider; import org.apache.paimon.table.source.splitread.MergeFileSplitReadProvider; import org.apache.paimon.table.source.splitread.RawFileSplitReadProvider; @@ -62,6 +63,8 @@ public KeyValueTableRead( Arrays.asList( new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues), new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues), + new IncrementalCompactDiffReadProvider( + mergeReadSupplier, this::assignValues), new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues), new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionFollowUpScanner.java new file mode 100644 index 0000000000000..473978117cebc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionFollowUpScanner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.source.ScanMode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** {@link FollowUpScanner} for read all changed files after compact. */ +public class CompactionFollowUpScanner implements FollowUpScanner { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionFollowUpScanner.class); + + @Override + public boolean shouldScanSnapshot(Snapshot snapshot) { + if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + return true; + } + + LOG.debug( + "Next snapshot id {} is not COMPACT, but is {}, check next one.", + snapshot.id(), + snapshot.commitKind()); + return false; + } + + @Override + public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader snapshotReader) { + return snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).readChanges(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 25c3ffa96dfc6..48520442514ab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -397,7 +397,8 @@ private Plan toChangesPlan( .withBeforeFiles(before) .withDataFiles(data) .isStreaming(isStreaming) - .withBucketPath(pathFactory.bucketPath(part, bucket).toString()); + .withBucketPath(pathFactory.bucketPath(part, bucket).toString()) + .withCommitKind(plan.commitKind()); if (deletionVectors) { builder.withBeforeDeletionFiles( getDeletionFiles( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffReadProvider.java new file mode 100644 index 0000000000000..12ce12a75f14f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffReadProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** A {@link SplitReadProvider} to streaming incremental diff read after compaction. */ +public class IncrementalCompactDiffReadProvider implements SplitReadProvider { + + private final LazyField> splitRead; + + public IncrementalCompactDiffReadProvider( + Supplier supplier, + Consumer> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead create(Supplier supplier) { + return new IncrementalCompactDiffSplitRead(supplier.get()); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return split.commitKind() == Snapshot.CommitKind.COMPACT + && !split.beforeFiles().isEmpty() + && split.isStreaming(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffSplitRead.java new file mode 100644 index 0000000000000..06bfb73e65141 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalCompactDiffSplitRead.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.EmptyRecordReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link SplitRead} for streaming incremental diff after compaction. */ +public class IncrementalCompactDiffSplitRead extends IncrementalDiffSplitRead { + + public IncrementalCompactDiffSplitRead(MergeFileSplitRead mergeRead) { + super(mergeRead); + } + + @Override + public RecordReader createReader(DataSplit split) throws IOException { + if (split.beforeFiles().stream().noneMatch(file -> file.level() == 0)) { + return new EmptyRecordReader<>(); + } + return super.createReader(filterLevel0File(split)); + } + + private DataSplit filterLevel0File(DataSplit split) { + List beforeFiles = + split.beforeFiles().stream() + .filter(file -> file.level() > 0) + .collect(Collectors.toList()); + List afterFiles = + split.dataFiles().stream() + .filter(file -> file.level() > 0) + .collect(Collectors.toList()); + DataSplit.Builder builder = + new DataSplit.Builder() + .withSnapshot(split.snapshotId()) + .withPartition(split.partition()) + .withBucket(split.bucket()) + .withBucketPath(split.bucketPath()) + .withBeforeFiles(beforeFiles) + .withDataFiles(afterFiles) + .isStreaming(split.isStreaming()) + .rawConvertible(split.rawConvertible()) + .withCommitKind(split.commitKind()); + + if (split.beforeDeletionFiles().isPresent()) { + builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get()); + } + if (split.deletionFiles().isPresent()) { + builder.withDataDeletionFiles(split.deletionFiles().get()); + } + return builder.build(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index cc8fc98dd4e27..eaaf8ca70bc8c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -127,10 +127,21 @@ protected void write(Table table, IOManager ioManager, InternalRow... rows) thro } protected void compact(Table table, BinaryRow partition, int bucket) throws Exception { + compact(table, partition, bucket, null, true); + } + + protected void compact( + Table table, + BinaryRow partition, + int bucket, + IOManager ioManager, + boolean fullCompaction) + throws Exception { BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); try (BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit()) { - write.compact(partition, bucket, true); + write.withIOManager(ioManager); + write.compact(partition, bucket, fullCompaction); commit.commit(write.prepareCommit()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 01ebbde201540..4090193de285d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -98,7 +98,9 @@ public class FileStoreLookupFunction implements Serializable, Closeable { public FileStoreLookupFunction( Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { - TableScanUtils.streamingReadingValidate(table); + if (!TableScanUtils.supportCompactDiffStreamingReading(table)) { + TableScanUtils.streamingReadingValidate(table); + } this.table = table; this.partitionLoader = DynamicPartitionLoader.of(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ceb40c1a864fc..fb42a6906c896 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.io.SplitsParallelReadUtil; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.options.ConfigOption; @@ -50,6 +51,9 @@ import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; +import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; +import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE; +import static org.apache.paimon.CoreOptions.StreamScanMode.COMPACT_DELTA_MONITOR; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; @@ -117,12 +121,20 @@ private Table unsetTimeTravelOptions(Table origin) { Map newOptions = new HashMap<>(fileStoreTable.options()); TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove); - CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode(); + CoreOptions coreOptions = CoreOptions.fromMap(newOptions); + CoreOptions.StartupMode startupMode = coreOptions.startupMode(); if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) { startupMode = CoreOptions.StartupMode.LATEST_FULL; } newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString()); + if (origin.primaryKeys().size() > 0 + && coreOptions.changelogProducer() == CoreOptions.ChangelogProducer.NONE + && TableScanUtils.supportCompactDiffStreamingReading(origin)) { + newOptions.put(STREAM_SCAN_MODE.key(), COMPACT_DELTA_MONITOR.getValue()); + newOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); + } + TableSchema newSchema = fileStoreTable.schema().copy(newOptions); return fileStoreTable.copy(newSchema); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java index a5645302f93f3..5a35e811a537f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java @@ -20,12 +20,15 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.TableScan; import java.util.HashMap; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; /** Utility methods for {@link TableScan}, such as validating. */ public class TableScanUtils { @@ -59,4 +62,20 @@ public static Optional getSnapshotId(FileStoreSourceSplit split) { } return Optional.empty(); } + + public static boolean supportCompactDiffStreamingReading(Table table) { + CoreOptions options = CoreOptions.fromMap(table.options()); + Set compactDiffReadingEngine = + new HashSet() { + { + add(CoreOptions.MergeEngine.PARTIAL_UPDATE); + add(CoreOptions.MergeEngine.AGGREGATE); + } + }; + + return options.needLookup() + && compactDiffReadingEngine.contains(options.mergeEngine()) + && !Options.fromMap(options.toMap()) + .get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 14643542e73de..ccf31e9c71b9a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.JoinedRow; @@ -722,6 +723,105 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception table.close(); } + @Test + public void testFullCacheLookupTableWithForceLookup() throws Exception { + Options options = new Options(); + options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); + options.set(CoreOptions.WRITE_ONLY, true); + options.set(CoreOptions.FORCE_LOOKUP, true); + options.set(CoreOptions.BUCKET, 1); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + FileStoreTable compactTable = + storeTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + + // initialize + write(storeTable, ioManager, GenericRow.of(1, 11, 111)); + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true); + table.open(); + + List result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 111); + + // first write + write(storeTable, GenericRow.of(1, null, 222)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 111); // old value because there is no compact + + // only L0 occur compact + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, false); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 222); // get new value after compact + + // second write + write(storeTable, GenericRow.of(1, 22, null)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 222); // old value + + // full compact + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 22, 222); // new value + } + + @Test + public void testPartialLookupTableWithForceLookup() throws Exception { + Options options = new Options(); + options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); + options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.NONE); + options.set(CoreOptions.FORCE_LOOKUP, true); + options.set(CoreOptions.BUCKET, 1); + FileStoreTable dimTable = createTable(singletonList("f0"), options); + + PrimaryKeyPartialLookupTable table = + PrimaryKeyPartialLookupTable.createLocalTable( + dimTable, + new int[] {0, 1, 2}, + tempDir.toFile(), + ImmutableList.of("f0"), + null); + table.open(); + + List result = table.get(row(1, -1)); + assertThat(result).hasSize(0); + + write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, -2, 22)); + result = table.get(row(1)); + assertThat(result).hasSize(0); + + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, -1, 11); + result = table.get(row(2)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 2, -2, 22); + + write(dimTable, ioManager, GenericRow.of(1, null, 111)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, -1, 111); + } + private FileStoreTable createDimTable() throws Exception { FileIO fileIO = LocalFileIO.create(); org.apache.paimon.fs.Path tablePath =