diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 446931acacda..75b64e9068ef 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -339,6 +339,12 @@
Integer |
The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. |
+
+ force-lookup |
+ false |
+ Boolean |
+ Whether to force the use of lookup for compaction. |
+
lookup-wait |
true |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 17bff3653229..bf42f31f4eb9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1251,6 +1251,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");
+ @Immutable
+ public static final ConfigOption FORCE_LOOKUP =
+ key("force-lookup")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to force the use of lookup for compaction.");
+
public static final ConfigOption LOOKUP_WAIT =
key("lookup-wait")
.booleanType()
@@ -1691,7 +1698,8 @@ public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
changelogProducer().equals(ChangelogProducer.LOOKUP),
- deletionVectorsEnabled());
+ deletionVectorsEnabled(),
+ options.get(FORCE_LOOKUP));
}
public boolean changelogRowDeduplicate() {
diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
index 24e03ebdb9ba..f01c7c967b38 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
@@ -29,15 +29,22 @@ public class LookupStrategy {
public final boolean deletionVector;
- private LookupStrategy(boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
+ private LookupStrategy(
+ boolean isFirstRow,
+ boolean produceChangelog,
+ boolean deletionVector,
+ boolean forceLookup) {
this.isFirstRow = isFirstRow;
this.produceChangelog = produceChangelog;
this.deletionVector = deletionVector;
- this.needLookup = produceChangelog || deletionVector || isFirstRow;
+ this.needLookup = produceChangelog || deletionVector || isFirstRow || forceLookup;
}
public static LookupStrategy from(
- boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
- return new LookupStrategy(isFirstRow, produceChangelog, deletionVector);
+ boolean isFirstRow,
+ boolean produceChangelog,
+ boolean deletionVector,
+ boolean forceLookup) {
+ return new LookupStrategy(isFirstRow, produceChangelog, deletionVector, forceLookup);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index dbee0805a8e8..e2d37eae4924 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -73,7 +73,7 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
highLevel::get,
EQUALISER,
changelogRowDeduplicate,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
null);
@@ -233,7 +233,7 @@ public void testSum(boolean changelogRowDeduplicate) {
key -> null,
EQUALISER,
changelogRowDeduplicate,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
null);
@@ -322,7 +322,7 @@ public void testMergeHighLevelOrder() {
highLevel::get,
EQUALISER,
false,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
UserDefinedSeqComparator.create(
RowType.builder().field("f0", DataTypes.INT()).build(),
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 09bd6eea14c9..e8ece0779ad1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -29,6 +29,8 @@
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -65,9 +67,11 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CompatibilityTestUtils;
+import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -95,7 +99,11 @@
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine;
+import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -1670,6 +1678,62 @@ public void testRollbackToTagWithChangelogDecoupled(String changelogProducer) th
// table-path/changelog/EARLIEST
}
+ @ParameterizedTest
+ @EnumSource(CoreOptions.MergeEngine.class)
+ public void testForceLookupCompaction(CoreOptions.MergeEngine mergeEngine) throws Exception {
+ Map> testData = new HashMap<>();
+ testData.put(DEDUPLICATE, Pair.of(50L, 100L));
+ testData.put(PARTIAL_UPDATE, Pair.of(null, 100L));
+ testData.put(AGGREGATE, Pair.of(30L, 70L));
+ testData.put(FIRST_ROW, Pair.of(100L, 70L));
+
+ Pair currentTestData = testData.get(mergeEngine);
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(CoreOptions.FORCE_LOOKUP, true);
+ options.set(MERGE_ENGINE, mergeEngine);
+ if (mergeEngine == AGGREGATE) {
+ options.set("fields.b.aggregate-function", "sum");
+ }
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ write.withIOManager(IOManager.create(tempDir.toString()));
+
+ // write data
+ write.write(rowData(1, 10, currentTestData.getLeft()));
+ commit.commit(1, write.prepareCommit(true, 1));
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2L);
+
+ write.write(rowData(1, 10, currentTestData.getRight()));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(4L);
+ assertThat(table.snapshotManager().latestSnapshot())
+ .matches(snapshot -> snapshot.commitKind() == COMPACT);
+
+ // 3 data files + bucket-0 directory
+ List files =
+ Files.walk(new File(tablePath.toUri().getPath(), "pt=1/bucket-0").toPath())
+ .collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(4);
+
+ // 2 data files compact into 1 file
+ FileStoreScan scan = table.store().newScan().withKind(ScanMode.DELTA);
+ assertThat(scan.plan().files(FileKind.ADD).size()).isEqualTo(1);
+ assertThat(scan.plan().files(FileKind.DELETE).size()).isEqualTo(2);
+
+ // check result
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
private void assertReadChangelog(int id, FileStoreTable table) throws Exception {
// read the changelog at #{id}
table =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
index 9c814463cd66..fcb4c89ebe09 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -34,7 +34,7 @@
/**
* {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
- * org.apache.paimon.CoreOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} set to false.
+ * org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
*/
public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {