diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index f315bdfa9f1b..4cd221996b6b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -219,7 +219,7 @@ private boolean shouldDelaySnapshot(long snapshotId) { return false; } - private FollowUpScanner createFollowUpScanner() { + protected FollowUpScanner createFollowUpScanner() { CoreOptions.StreamScanMode type = options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE); switch (type) { @@ -249,7 +249,7 @@ private FollowUpScanner createFollowUpScanner() { return followUpScanner; } - private BoundedChecker createBoundedChecker() { + protected BoundedChecker createBoundedChecker() { Long boundedWatermark = options.scanBoundedWatermark(); return boundedWatermark != null ? BoundedChecker.watermark(boundedWatermark) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index cc8fc98dd4e2..eaaf8ca70bc8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -127,10 +127,21 @@ protected void write(Table table, IOManager ioManager, InternalRow... rows) thro } protected void compact(Table table, BinaryRow partition, int bucket) throws Exception { + compact(table, partition, bucket, null, true); + } + + protected void compact( + Table table, + BinaryRow partition, + int bucket, + IOManager ioManager, + boolean fullCompaction) + throws Exception { BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); try (BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit()) { - write.compact(partition, bucket, true); + write.withIOManager(ioManager); + write.compact(partition, bucket, fullCompaction); commit.commit(write.prepareCommit()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionFollowUpScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionFollowUpScanner.java new file mode 100644 index 000000000000..19cbc99f3c6b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionFollowUpScanner.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.snapshot.FollowUpScanner; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** {@link FollowUpScanner} for read all changed files after compact. */ +public class CompactionFollowUpScanner implements FollowUpScanner { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionFollowUpScanner.class); + + @Override + public boolean shouldScanSnapshot(Snapshot snapshot) { + if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + return true; + } + + LOG.debug( + "Next snapshot id {} is not COMPACT, but is {}, check next one.", + snapshot.id(), + snapshot.commitKind()); + return false; + } + + @Override + public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader snapshotReader) { + return snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).readChanges(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 01ebbde20154..4090193de285 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -98,7 +98,9 @@ public class FileStoreLookupFunction implements Serializable, Closeable { public FileStoreLookupFunction( Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { - TableScanUtils.streamingReadingValidate(table); + if (!TableScanUtils.supportCompactDiffStreamingReading(table)) { + TableScanUtils.streamingReadingValidate(table); + } this.table = table; this.partitionLoader = DynamicPartitionLoader.of(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 28b0da0d150c..15b82fbe499e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -350,7 +350,7 @@ public Context( File tempPath, List joinKey, @Nullable Set requiredCachedBucketIds) { - this.table = table; + this.table = new LookupFileStoreTable(table, joinKey); this.projection = projection; this.tablePredicate = tablePredicate; this.projectedPredicate = projectedPredicate; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java new file mode 100644 index 000000000000..fef74127e7f4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.EmptyRecordReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.splitread.IncrementalDiffSplitRead; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link SplitRead} for streaming incremental diff after compaction. */ +public class IncrementalCompactDiffSplitRead extends IncrementalDiffSplitRead { + + public IncrementalCompactDiffSplitRead(MergeFileSplitRead mergeRead) { + super(mergeRead); + } + + @Override + public RecordReader createReader(DataSplit split) throws IOException { + if (split.beforeFiles().stream().noneMatch(file -> file.level() == 0)) { + return new EmptyRecordReader<>(); + } + return super.createReader(filterLevel0Files(split)); + } + + private DataSplit filterLevel0Files(DataSplit split) { + List beforeFiles = + split.beforeFiles().stream() + .filter(file -> file.level() > 0) + .collect(Collectors.toList()); + List afterFiles = + split.dataFiles().stream() + .filter(file -> file.level() > 0) + .collect(Collectors.toList()); + DataSplit.Builder builder = + new DataSplit.Builder() + .withSnapshot(split.snapshotId()) + .withPartition(split.partition()) + .withBucket(split.bucket()) + .withBucketPath(split.bucketPath()) + .withBeforeFiles(beforeFiles) + .withDataFiles(afterFiles) + .isStreaming(split.isStreaming()) + .rawConvertible(split.rawConvertible()); + + if (split.beforeDeletionFiles().isPresent()) { + builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get()); + } + if (split.deletionFiles().isPresent()) { + builder.withDataDeletionFiles(split.deletionFiles().get()); + } + return builder.build(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java new file mode 100644 index 000000000000..c822756ad08c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.DefaultValueAssigner; +import org.apache.paimon.table.source.DataTableStreamScan; +import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner; +import org.apache.paimon.table.source.snapshot.BoundedChecker; +import org.apache.paimon.table.source.snapshot.FollowUpScanner; +import org.apache.paimon.table.source.snapshot.FullStartingScanner; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.table.source.snapshot.StartingScanner; +import org.apache.paimon.utils.SnapshotManager; + +import static org.apache.paimon.CoreOptions.StartupMode; +import static org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode; + +/** + * {@link org.apache.paimon.table.source.StreamTableScan} implementation for lookup streaming + * planning. + */ +public class LookupDataTableScan extends DataTableStreamScan { + + private StartupMode startupMode; + private LookupStreamScanMode scanMode; + + public LookupDataTableScan( + CoreOptions options, + SnapshotReader snapshotReader, + SnapshotManager snapshotManager, + boolean supportStreamingReadOverwrite, + DefaultValueAssigner defaultValueAssigner, + LookupStreamScanMode scanMode) { + super( + options, + snapshotReader, + snapshotManager, + supportStreamingReadOverwrite, + defaultValueAssigner); + this.startupMode = options.startupMode(); + this.scanMode = scanMode; + } + + @Override + protected StartingScanner createStartingScanner(boolean isStreaming) { + return startupMode != CoreOptions.StartupMode.COMPACTED_FULL + ? new FullStartingScanner(snapshotReader.snapshotManager()) + : super.createStartingScanner(isStreaming); + } + + @Override + protected FollowUpScanner createFollowUpScanner() { + switch (scanMode) { + case NORMAL: + return super.createFollowUpScanner(); + case FILE_MONITOR: + return new AllDeltaFollowUpScanner(); + case COMPACT_DELTA_MONITOR: + return new CompactionFollowUpScanner(); + default: + throw new UnsupportedOperationException( + "Unknown lookup stream scan mode: " + scanMode.name()); + } + } + + @Override + protected BoundedChecker createBoundedChecker() { + return BoundedChecker.neverEnd(); // dim table should never end + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java new file mode 100644 index 000000000000..02b5d36aebee --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.utils.TableScanUtils; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.operation.DefaultValueAssigner; +import org.apache.paimon.options.Options; +import org.apache.paimon.options.description.DescribedEnum; +import org.apache.paimon.options.description.InlineElement; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.DelegatedFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.ReadBuilderImpl; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.utils.SimpleFileReader; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.options.description.TextElement.text; + +/** {@link FileStoreTable} for lookup table. */ +public class LookupFileStoreTable extends DelegatedFileStoreTable { + + private static final long serialVersionUID = 1L; + + private final LookupStreamScanMode lookupScanMode; + + public LookupFileStoreTable(FileStoreTable wrapped, List joinKeys) { + super(wrapped); + this.lookupScanMode = lookupStreamScanMode(wrapped, joinKeys); + } + + public LookupFileStoreTable(FileStoreTable wrapped, LookupStreamScanMode lookupScanMode) { + super(wrapped); + this.lookupScanMode = lookupScanMode; + } + + @Override + public ReadBuilder newReadBuilder() { + return new ReadBuilderImpl(this); + } + + @Override + public InnerTableRead newRead() { + switch (lookupScanMode) { + case NORMAL: + case FILE_MONITOR: + return wrapped.newRead(); + case COMPACT_DELTA_MONITOR: + return new LookupStreamCompactDiffRead( + ((KeyValueFileStore) wrapped.store()).newRead(), wrapped.schema()); + default: + throw new UnsupportedOperationException( + "Unknown lookup stream scan mode: " + lookupScanMode.name()); + } + } + + @Override + public StreamDataTableScan newStreamScan() { + return new LookupDataTableScan( + wrapped.coreOptions(), + wrapped.newSnapshotReader(), + wrapped.snapshotManager(), + wrapped.supportStreamingReadOverwrite(), + DefaultValueAssigner.create(wrapped.schema()), + lookupScanMode); + } + + @Override + public SimpleFileReader manifestListReader() { + return wrapped.manifestListReader(); + } + + @Override + public SimpleFileReader manifestFileReader() { + return wrapped.manifestFileReader(); + } + + @Override + public SimpleFileReader indexManifestFileReader() { + return wrapped.indexManifestFileReader(); + } + + @Override + public FileStoreTable copy(Map dynamicOptions) { + return new LookupFileStoreTable(wrapped.copy(dynamicOptions), lookupScanMode); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new LookupFileStoreTable(wrapped.copy(newTableSchema), lookupScanMode); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new LookupFileStoreTable(wrapped.copy(dynamicOptions), lookupScanMode); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new LookupFileStoreTable(wrapped.copyWithLatestSchema(), lookupScanMode); + } + + @Override + public FileStoreTable switchToBranch(String branchName) { + wrapped.switchToBranch(branchName); + return this; + } + + private LookupStreamScanMode lookupStreamScanMode(FileStoreTable table, List joinKeys) { + Options options = Options.fromMap(table.options()); + if (options.get(LOOKUP_CACHE_MODE) == FlinkConnectorOptions.LookupCacheMode.AUTO + && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { + return LookupStreamScanMode.FILE_MONITOR; + } else if (table.primaryKeys().size() > 0 + && options.get(CHANGELOG_PRODUCER) == CoreOptions.ChangelogProducer.NONE + && TableScanUtils.supportCompactDiffStreamingReading(table)) { + return LookupStreamScanMode.COMPACT_DELTA_MONITOR; + } else { + return LookupStreamScanMode.NORMAL; + } + } + + /** Inner stream scan mode for lookup table. */ + public enum LookupStreamScanMode implements DescribedEnum { + NORMAL("normal", "Streaming reading based on changelog or data files."), + FILE_MONITOR("file-monitor", "Monitor data file changes."), + COMPACT_DELTA_MONITOR( + "compact-delta-monitor", + "Streaming reading based on data changes before and after compaction."); + + private final String value; + private final String description; + + LookupStreamScanMode(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamCompactDiffRead.java new file mode 100644 index 000000000000..8a973a8d7973 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamCompactDiffRead.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.AbstractDataTableRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.Split; + +import java.io.IOException; + +import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; + +/** An {@link InnerTableRead} that reads the data changed before and after compaction. */ +public class LookupStreamCompactDiffRead extends AbstractDataTableRead { + private final SplitRead fullPhaseMergeRead; + private final SplitRead incrementalDiffRead; + + public LookupStreamCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema schema) { + super(schema); + this.incrementalDiffRead = new IncrementalCompactDiffSplitRead(mergeRead); + this.fullPhaseMergeRead = + SplitRead.convert(mergeRead, split -> unwrap(mergeRead.createReader(split))); + } + + @Override + public void projection(int[][] projection) { + incrementalDiffRead.withProjection(projection); + } + + @Override + public RecordReader reader(Split split) throws IOException { + DataSplit dataSplit = (DataSplit) split; + if (dataSplit.beforeFiles().isEmpty()) { + return fullPhaseMergeRead.createReader(dataSplit); // full reading phase + } else { + return incrementalDiffRead.createReader((DataSplit) split); + } + } + + @Override + protected InnerTableRead innerWithFilter(Predicate predicate) { + incrementalDiffRead.withFilter(predicate); + return this; + } + + @Override + public InnerTableRead forceKeepDelete() { + incrementalDiffRead.forceKeepDelete(); + return this; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ceb40c1a864f..fa9b7672db2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -22,13 +22,10 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.SplitsParallelReadUtil; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; @@ -42,10 +39,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; @@ -62,20 +56,12 @@ public class LookupStreamingReader { @Nullable private final Predicate projectedPredicate; private final StreamTableScan scan; - private static final List> TIME_TRAVEL_OPTIONS = - Arrays.asList( - CoreOptions.SCAN_TIMESTAMP_MILLIS, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.SCAN_SNAPSHOT_ID, - CoreOptions.SCAN_TAG_NAME, - CoreOptions.SCAN_VERSION); - public LookupStreamingReader( Table table, int[] projection, @Nullable Predicate predicate, Set requireCachedBucketIds) { - this.table = unsetTimeTravelOptions(table); + this.table = table; this.projection = projection; this.readBuilder = this.table @@ -112,21 +98,6 @@ public LookupStreamingReader( } } - private Table unsetTimeTravelOptions(Table origin) { - FileStoreTable fileStoreTable = (FileStoreTable) origin; - Map newOptions = new HashMap<>(fileStoreTable.options()); - TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove); - - CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode(); - if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) { - startupMode = CoreOptions.StartupMode.LATEST_FULL; - } - newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString()); - - TableSchema newSchema = fileStoreTable.schema().copy(newOptions); - return fileStoreTable.copy(newSchema); - } - public RecordReader nextBatch(boolean useParallelism) throws Exception { List splits = scan.plan().splits(); CoreOptions options = CoreOptions.fromMap(table.options()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index bdf0a1b4af77..967826e11fd8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -40,16 +40,10 @@ import java.io.File; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Function; -import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; -import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE; -import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR; - /** Lookup table for primary key which supports to read the LSM tree directly. */ public class PrimaryKeyPartialLookupTable implements LookupTable { @@ -158,7 +152,11 @@ public static PrimaryKeyPartialLookupTable createLocalTable( return new PrimaryKeyPartialLookupTable( filter -> new LocalQueryExecutor( - table, projection, tempPath, filter, requireCachedBucketIds), + new LookupFileStoreTable(table, joinKey), + projection, + tempPath, + filter, + requireCachedBucketIds), table, joinKey); } @@ -192,12 +190,8 @@ private LocalQueryExecutor( .withValueProjection(Projection.of(projection).toNestedIndexes()) .withIOManager(new IOManagerImpl(tempPath.toString())); - Map dynamicOptions = new HashMap<>(); - dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue()); - dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); this.scan = - table.copy(dynamicOptions) - .newReadBuilder() + table.newReadBuilder() .withFilter(filter) .withBucketFilter( requireCachedBucketIds == null diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java index a5645302f93f..30b7bbdd5dc5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java @@ -20,12 +20,15 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.TableScan; import java.util.HashMap; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; /** Utility methods for {@link TableScan}, such as validating. */ public class TableScanUtils { @@ -59,4 +62,24 @@ public static Optional getSnapshotId(FileStoreSourceSplit split) { } return Optional.empty(); } + + /** + * Check whether streaming reading is supported based on the data changed before and after + * compact. + */ + public static boolean supportCompactDiffStreamingReading(Table table) { + CoreOptions options = CoreOptions.fromMap(table.options()); + Set compactDiffReadingEngine = + new HashSet() { + { + add(CoreOptions.MergeEngine.PARTIAL_UPDATE); + add(CoreOptions.MergeEngine.AGGREGATE); + } + }; + + return options.needLookup() + && compactDiffReadingEngine.contains(options.mergeEngine()) + && !Options.fromMap(options.toMap()) + .get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 14643542e73d..619cb4c1d620 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.JoinedRow; @@ -722,6 +723,108 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception table.close(); } + @Test + public void testFullCacheLookupTableWithForceLookup() throws Exception { + Options options = new Options(); + options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); + options.set( + FlinkConnectorOptions.LOOKUP_CACHE_MODE, + FlinkConnectorOptions.LookupCacheMode.FULL); + options.set(CoreOptions.WRITE_ONLY, true); + options.set(CoreOptions.FORCE_LOOKUP, true); + options.set(CoreOptions.BUCKET, 1); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + FileStoreTable compactTable = + storeTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + + // initialize + write(storeTable, ioManager, GenericRow.of(1, 11, 111)); + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true); + table.open(); + + List result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 111); + + // first write + write(storeTable, GenericRow.of(1, null, 222)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 111); // old value because there is no compact + + // only L0 occur compact + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, false); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 222); // get new value after compact + + // second write + write(storeTable, GenericRow.of(1, 22, null)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11, 222); // old value + + // full compact + compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 22, 222); // new value + } + + @Test + public void testPartialLookupTableWithForceLookup() throws Exception { + Options options = new Options(); + options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); + options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.NONE); + options.set(CoreOptions.FORCE_LOOKUP, true); + options.set(CoreOptions.BUCKET, 1); + FileStoreTable dimTable = createTable(singletonList("f0"), options); + + PrimaryKeyPartialLookupTable table = + PrimaryKeyPartialLookupTable.createLocalTable( + dimTable, + new int[] {0, 1, 2}, + tempDir.toFile(), + ImmutableList.of("f0"), + null); + table.open(); + + List result = table.get(row(1, -1)); + assertThat(result).hasSize(0); + + write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, -2, 22)); + result = table.get(row(1)); + assertThat(result).hasSize(0); + + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, -1, 11); + result = table.get(row(2)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 2, -2, 22); + + write(dimTable, ioManager, GenericRow.of(1, null, 111)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, -1, 111); + } + private FileStoreTable createDimTable() throws Exception { FileIO fileIO = LocalFileIO.create(); org.apache.paimon.fs.Path tablePath =