diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 7e1f8498b27d..607772001610 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -671,12 +671,13 @@ public class CoreOptions implements Serializable { "Full compaction will be constantly triggered after delta commits."); @ExcludeFromDocumentation("Internal use only") - public static final ConfigOption STREAMING_COMPACT = - key("streaming-compact") - .enumType(StreamingCompactionType.class) - .defaultValue(StreamingCompactionType.NONE) + public static final ConfigOption STREAM_SCAN_MODE = + key("stream-scan-mode") + .enumType(StreamScanMode.class) + .defaultValue(StreamScanMode.NONE) .withDescription( - "Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' dedicated streaming compaction job."); + "Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' " + + "dedicated internal streaming scan."); public static final ConfigOption STREAMING_READ_MODE = key("streaming-read-mode") @@ -1615,16 +1616,18 @@ public static StreamingReadMode fromValue(String value) { } } - /** Compaction type when trigger a compaction action. */ - public enum StreamingCompactionType implements DescribedEnum { - NONE("none", "Not a streaming compaction."), - NORMAL("normal", "Compaction for traditional bucket table."), - BUCKET_UNAWARE("unaware", "Compaction for unaware bucket table."); + /** Inner stream scan mode for some internal requirements. */ + public enum StreamScanMode implements DescribedEnum { + NONE("none", "No requirement."), + COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for traditional bucket table."), + COMPACT_APPEND_NO_BUCKET( + "compact-append-no-bucket", "Compaction for append table with bucket unaware."), + FILE_MONITOR("file-monitor", "Monitor data file changes."); private final String value; private final String description; - StreamingCompactionType(String value, String description) { + StreamScanMode(String value, String description) { this.value = value; this.description = description; } @@ -1642,22 +1645,6 @@ public InlineElement getDescription() { public String getValue() { return value; } - - @VisibleForTesting - public static StreamingCompactionType fromValue(String value) { - for (StreamingCompactionType formatType : StreamingCompactionType.values()) { - if (formatType.value.equals(value)) { - return formatType; - } - } - throw new IllegalArgumentException( - String.format( - "Invalid format type %s, only support [%s]", - value, - StringUtils.join( - Arrays.stream(StreamingCompactionType.values()).iterator(), - ","))); - } } /** Specifies this scan type for incremental scan . */ diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java index cd284af7f6bd..62a238419aaa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java @@ -172,8 +172,8 @@ private Map compactScanType() { return new HashMap() { { put( - CoreOptions.STREAMING_COMPACT.key(), - CoreOptions.StreamingCompactionType.BUCKET_UNAWARE.getValue()); + CoreOptions.STREAM_SCAN_MODE.key(), + CoreOptions.StreamScanMode.COMPACT_APPEND_NO_BUCKET.getValue()); } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index e90d5961208a..8df089b0bf21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -79,20 +79,16 @@ public CoreOptions options() { protected StartingScanner createStartingScanner(boolean isStreaming) { SnapshotManager snapshotManager = snapshotReader.snapshotManager(); - CoreOptions.StreamingCompactionType type = - options.toConfiguration().get(CoreOptions.STREAMING_COMPACT); + CoreOptions.StreamScanMode type = + options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE); switch (type) { - case NORMAL: - { - checkArgument( - isStreaming, - "Set 'streaming-compact' in batch mode. This is unexpected."); - return new ContinuousCompactorStartingScanner(snapshotManager); - } - case BUCKET_UNAWARE: - { - return new FullStartingScanner(snapshotManager); - } + case COMPACT_BUCKET_TABLE: + checkArgument( + isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected."); + return new ContinuousCompactorStartingScanner(snapshotManager); + case COMPACT_APPEND_NO_BUCKET: + case FILE_MONITOR: + return new FullStartingScanner(snapshotManager); } // read from consumer id diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java index 1de22a644557..ca4bd8a8d5b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java @@ -23,6 +23,7 @@ import org.apache.paimon.consumer.Consumer; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner; import org.apache.paimon.table.source.snapshot.BoundedChecker; import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner; import org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner; @@ -185,17 +186,15 @@ private Plan nextPlan() { } private FollowUpScanner createFollowUpScanner() { - CoreOptions.StreamingCompactionType type = - options.toConfiguration().get(CoreOptions.STREAMING_COMPACT); + CoreOptions.StreamScanMode type = + options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE); switch (type) { - case NORMAL: - { - return new ContinuousCompactorFollowUpScanner(); - } - case BUCKET_UNAWARE: - { - return new ContinuousAppendAndCompactFollowUpScanner(); - } + case COMPACT_BUCKET_TABLE: + return new ContinuousCompactorFollowUpScanner(); + case COMPACT_APPEND_NO_BUCKET: + return new ContinuousAppendAndCompactFollowUpScanner(); + case FILE_MONITOR: + return new AllDeltaFollowUpScanner(); } CoreOptions.ChangelogProducer changelogProducer = options.changelogProducer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AllDeltaFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AllDeltaFollowUpScanner.java new file mode 100644 index 000000000000..8cafc4b6f776 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AllDeltaFollowUpScanner.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.source.ScanMode; + +/** {@link FollowUpScanner} for read all file changes. */ +public class AllDeltaFollowUpScanner implements FollowUpScanner { + + @Override + public boolean shouldScanSnapshot(Snapshot snapshot) { + return true; + } + + @Override + public SnapshotReader.Plan scan(long snapshotId, SnapshotReader snapshotReader) { + return snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).readChanges(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java index c7e0a24a862b..ebdaa2fbb118 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java @@ -30,6 +30,6 @@ public interface FollowUpScanner { Plan scan(long snapshotId, SnapshotReader snapshotReader); default Plan getOverwriteChangesPlan(long snapshotId, SnapshotReader snapshotReader) { - return snapshotReader.withSnapshot(snapshotId).readOverwrittenChanges(); + return snapshotReader.withSnapshot(snapshotId).readChanges(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 142d4b5fa4b6..d53a7d2e3089 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -60,8 +60,8 @@ public interface SnapshotReader { /** Get splits plan from snapshot. */ Plan read(); - /** Get splits plan from an overwritten snapshot. */ - Plan readOverwrittenChanges(); + /** Get splits plan from file changes. */ + Plan readChanges(); Plan readIncrementalDiff(Snapshot before); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 391da875f593..70673ca6d15f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -231,18 +231,10 @@ public List partitions() { .collect(Collectors.toList()); } - /** Get splits from an overwritten snapshot files. */ @Override - public Plan readOverwrittenChanges() { + public Plan readChanges() { withMode(ScanMode.DELTA); FileStoreScan.Plan plan = scan.plan(); - long snapshotId = plan.snapshotId(); - - Snapshot snapshot = snapshotManager.snapshot(snapshotId); - if (snapshot.commitKind() != Snapshot.CommitKind.OVERWRITE) { - throw new IllegalStateException( - "Cannot read overwrite splits from a non-overwrite snapshot."); - } Map>> beforeFiles = groupByPartFiles(plan.files(FileKind.DELETE)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index f84ddcb59c10..c26dd76013e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -253,8 +253,8 @@ public Plan read() { } @Override - public Plan readOverwrittenChanges() { - return snapshotReader.readOverwrittenChanges(); + public Plan readChanges() { + return snapshotReader.readChanges(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java new file mode 100644 index 000000000000..a43c5da5af39 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.Experimental; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.DataTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerStreamTableScan; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; +import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE; +import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** A table to produce modified files for snapshots. */ +@Experimental +public class FileMonitorTable implements DataTable, ReadonlyTable { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable wrapped; + + private static final RowType ROW_TYPE = + RowType.of( + new DataType[] { + new BigIntType(false), + newBytesType(false), + new IntType(false), + newBytesType(false), + newBytesType(false) + }, + new String[] { + "_SNAPSHOT_ID", "_PARTITION", "_BUCKET", "_BEFORE_FILES", "_DATA_FILES" + }); + + public FileMonitorTable(FileStoreTable wrapped) { + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue()); + dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); + this.wrapped = wrapped.copy(dynamicOptions); + } + + @Override + public Path location() { + return wrapped.location(); + } + + @Override + public SnapshotManager snapshotManager() { + return wrapped.snapshotManager(); + } + + @Override + public TagManager tagManager() { + return wrapped.tagManager(); + } + + @Override + public String name() { + return "__internal_file_monitor_" + wrapped.location().getName(); + } + + @Override + public RowType rowType() { + return ROW_TYPE; + } + + @Override + public Map options() { + return wrapped.options(); + } + + @Override + public List primaryKeys() { + return Collections.emptyList(); + } + + @Override + public SnapshotReader newSnapshotReader() { + return wrapped.newSnapshotReader(); + } + + @Override + public InnerTableScan newScan() { + return wrapped.newScan(); + } + + @Override + public InnerStreamTableScan newStreamScan() { + return wrapped.newStreamScan(); + } + + @Override + public CoreOptions coreOptions() { + return wrapped.coreOptions(); + } + + @Override + public InnerTableRead newRead() { + return new BucketsRead(); + } + + @Override + public FileMonitorTable copy(Map dynamicOptions) { + return new FileMonitorTable(wrapped.copy(dynamicOptions)); + } + + @Override + public FileIO fileIO() { + return wrapped.fileIO(); + } + + public static RowType getRowType() { + return ROW_TYPE; + } + + private static class BucketsRead implements InnerTableRead { + + @Override + public InnerTableRead withFilter(Predicate predicate) { + // filter is done by scan + return this; + } + + @Override + public InnerTableRead withProjection(int[][] projection) { + throw new UnsupportedOperationException("BucketsRead does not support projection"); + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + if (!(split instanceof DataSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + + DataSplit dataSplit = (DataSplit) split; + + FileChange change = + new FileChange( + dataSplit.snapshotId(), + dataSplit.partition(), + dataSplit.bucket(), + dataSplit.beforeFiles(), + dataSplit.dataFiles()); + + return new IteratorRecordReader<>(Collections.singletonList(toRow(change)).iterator()); + } + } + + public static InternalRow toRow(FileChange change) throws IOException { + DataFileMetaSerializer fileSerializer = new DataFileMetaSerializer(); + return GenericRow.of( + change.snapshotId(), + serializeBinaryRow(change.partition()), + change.bucket(), + fileSerializer.serializeList(change.beforeFiles()), + fileSerializer.serializeList(change.dataFiles())); + } + + public static FileChange toFileChange(InternalRow row) throws IOException { + DataFileMetaSerializer fileSerializer = new DataFileMetaSerializer(); + return new FileChange( + row.getLong(0), + deserializeBinaryRow(row.getBinary(1)), + row.getInt(2), + fileSerializer.deserializeList(row.getBinary(3)), + fileSerializer.deserializeList(row.getBinary(4))); + } + + /** Pojo to record of file change. */ + public static class FileChange { + + private final long snapshotId; + private final BinaryRow partition; + private final int bucket; + private final List beforeFiles; + private final List dataFiles; + + public FileChange( + long snapshotId, + BinaryRow partition, + int bucket, + List beforeFiles, + List dataFiles) { + this.snapshotId = snapshotId; + this.partition = partition; + this.bucket = bucket; + this.beforeFiles = beforeFiles; + this.dataFiles = dataFiles; + } + + public long snapshotId() { + return snapshotId; + } + + public BinaryRow partition() { + return partition; + } + + public int bucket() { + return bucket; + } + + public List beforeFiles() { + return beforeFiles; + } + + public List dataFiles() { + return dataFiles; + } + + @Override + public String toString() { + return "FileChange{" + + "snapshotId=" + + snapshotId + + ", partition=" + + partition + + ", bucket=" + + bucket + + ", beforeFiles=" + + beforeFiles + + ", dataFiles=" + + dataFiles + + '}'; + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java index 278305c38f4a..ae5e6c7ef376 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java @@ -54,6 +54,7 @@ import org.apache.paimon.table.source.TableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.system.AuditLogTable; +import org.apache.paimon.table.system.FileMonitorTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -62,6 +63,7 @@ import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -969,6 +971,84 @@ public void testFullCompactedRead() throws Exception { .containsExactly("1|10|200|binary|varbinary|mapKey:mapVal|multiset"); } + @Test + public void testInnerStreamScanMode() throws Exception { + FileStoreTable table = createFileStoreTable(); + + FileMonitorTable monitorTable = new FileMonitorTable(table); + ReadBuilder readBuilder = monitorTable.newReadBuilder(); + StreamTableScan scan = readBuilder.newStreamScan(); + TableRead read = readBuilder.newRead(); + + // 1. first write + + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit(); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 101L)); + commit.commit(write.prepareCommit()); + + List results = new ArrayList<>(); + read.createReader(scan.plan()).forEachRemaining(results::add); + read.createReader(scan.plan()).forEachRemaining(results::add); + assertThat(results).hasSize(1); + FileMonitorTable.FileChange change = FileMonitorTable.toFileChange(results.get(0)); + assertThat(change.beforeFiles()).hasSize(0); + assertThat(change.dataFiles()).hasSize(1); + results.clear(); + + // 2. second write and compact + + write.close(); + commit.close(); + writeBuilder = table.newBatchWriteBuilder(); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 101L)); + write.compact(binaryRow(1), 0, true); + commit.commit(write.prepareCommit()); + + // 2.1 read add file + + read.createReader(scan.plan()).forEachRemaining(results::add); + assertThat(results).hasSize(1); + change = FileMonitorTable.toFileChange(results.get(0)); + assertThat(change.beforeFiles()).hasSize(0); + assertThat(change.dataFiles()).hasSize(1); + results.clear(); + + // 2.2 read compact + + read.createReader(scan.plan()).forEachRemaining(results::add); + assertThat(results).hasSize(1); + change = FileMonitorTable.toFileChange(results.get(0)); + assertThat(change.beforeFiles()).hasSize(2); + assertThat(change.dataFiles()).hasSize(1); + results.clear(); + + // 3 overwrite + write.close(); + commit.close(); + writeBuilder = table.newBatchWriteBuilder().withOverwrite(); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 101L)); + commit.commit(write.prepareCommit()); + + read.createReader(scan.plan()).forEachRemaining(results::add); + assertThat(results).hasSize(1); + change = FileMonitorTable.toFileChange(results.get(0)); + assertThat(change.beforeFiles()).hasSize(1); + assertThat(change.dataFiles()).hasSize(1); + + write.close(); + commit.close(); + } + @Override protected FileStoreTable createFileStoreTable(Consumer configure) throws Exception { return createFileStoreTable(configure, ROW_TYPE); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java index 3a29d3521e93..fc6eda4c42c5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java @@ -130,8 +130,8 @@ private Map streamingCompactOptions() { return new HashMap() { { put( - CoreOptions.STREAMING_COMPACT.key(), - CoreOptions.StreamingCompactionType.NORMAL.getValue()); + CoreOptions.STREAM_SCAN_MODE.key(), + CoreOptions.StreamScanMode.COMPACT_BUCKET_TABLE.getValue()); put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), null); } }; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java index d26a9fe12ec5..2fa4b37987e0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java @@ -39,8 +39,8 @@ public static Map compactOptions(boolean isStreaming) { return new HashMap() { { put( - CoreOptions.STREAMING_COMPACT.key(), - CoreOptions.StreamingCompactionType.NORMAL.getValue()); + CoreOptions.STREAM_SCAN_MODE.key(), + CoreOptions.StreamScanMode.COMPACT_BUCKET_TABLE.getValue()); put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), null); put(CoreOptions.WRITE_ONLY.key(), "false"); }