diff --git a/docs/content/primary-key-table/merge-engine.md b/docs/content/primary-key-table/merge-engine.md index 02b5e0b24a7d..f4daa7bf7357 100644 --- a/docs/content/primary-key-table/merge-engine.md +++ b/docs/content/primary-key-table/merge-engine.md @@ -354,9 +354,8 @@ By specifying `'merge-engine' = 'first-row'`, users can keep the first row of th `deduplicate` merge engine that in the `first-row` merge engine, it will generate insert only changelog. {{< hint info >}} -1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}). -2. You can not specify `sequence.field`. -3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records. +1. You can not specify `sequence.field`. +2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records. {{< /hint >}} This is of great help in replacing log deduplication in streaming computation. diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index e46946b4a53e..78f8fc4f2409 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1549,7 +1549,8 @@ public boolean needLookup() { public LookupStrategy lookupStrategy() { return LookupStrategy.from( - options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP), + mergeEngine().equals(MergeEngine.FIRST_ROW), + changelogProducer().equals(ChangelogProducer.LOOKUP), deletionVectorsEnabled()); } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java index 6c709bcaef8e..24e03ebdb9ba 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java @@ -19,38 +19,25 @@ package org.apache.paimon.lookup; /** Strategy for lookup. */ -public enum LookupStrategy { - NO_LOOKUP(false, false), - - CHANGELOG_ONLY(true, false), - - DELETION_VECTOR_ONLY(false, true), - - CHANGELOG_AND_DELETION_VECTOR(true, true); +public class LookupStrategy { public final boolean needLookup; + public final boolean isFirstRow; + public final boolean produceChangelog; public final boolean deletionVector; - LookupStrategy(boolean produceChangelog, boolean deletionVector) { + private LookupStrategy(boolean isFirstRow, boolean produceChangelog, boolean deletionVector) { + this.isFirstRow = isFirstRow; this.produceChangelog = produceChangelog; this.deletionVector = deletionVector; - this.needLookup = produceChangelog || deletionVector; + this.needLookup = produceChangelog || deletionVector || isFirstRow; } - public static LookupStrategy from(boolean produceChangelog, boolean deletionVector) { - for (LookupStrategy strategy : values()) { - if (strategy.produceChangelog == produceChangelog - && strategy.deletionVector == deletionVector) { - return strategy; - } - } - throw new IllegalArgumentException( - "Invalid combination of produceChangelog : " - + produceChangelog - + " and deletionVector : " - + deletionVector); + public static LookupStrategy from( + boolean isFirstRow, boolean produceChangelog, boolean deletionVector) { + return new LookupStrategy(isFirstRow, produceChangelog, deletionVector); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index b7702ef8abf8..1bdc3042d628 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -83,7 +83,6 @@ import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; -import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; /** {@link FileStoreWrite} for {@link KeyValueFileStore}. */ @@ -273,6 +272,7 @@ private MergeTreeCompactRewriter createRewriter( int maxLevel = options.numLevels() - 1; MergeEngine mergeEngine = options.mergeEngine(); ChangelogProducer changelogProducer = options.changelogProducer(); + LookupStrategy lookupStrategy = options.lookupStrategy(); if (changelogProducer.equals(FULL_COMPACTION)) { return new FullChangelogMergeTreeCompactRewriter( maxLevel, @@ -285,12 +285,11 @@ private MergeTreeCompactRewriter createRewriter( mergeSorter, valueEqualiserSupplier.get(), options.changelogRowDeduplicate()); - } else if (options.needLookup()) { - LookupStrategy lookupStrategy = options.lookupStrategy(); + } else if (lookupStrategy.needLookup) { LookupLevels.ValueProcessor processor; LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory wrapperFactory; FileReaderFactory lookupReaderFactory = readerFactory; - if (mergeEngine == FIRST_ROW) { + if (lookupStrategy.isFirstRow) { if (options.deletionVectorsEnabled()) { throw new UnsupportedOperationException( "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 03fbcfae874f..8b97c0911503 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -169,9 +169,10 @@ public static void validateTableSchema(TableSchema schema) { } if (options.mergeEngine() == MergeEngine.FIRST_ROW) { - if (options.changelogProducer() != ChangelogProducer.LOOKUP) { + if (options.changelogProducer() != ChangelogProducer.LOOKUP + && options.changelogProducer() != ChangelogProducer.NONE) { throw new IllegalArgumentException( - "Only support 'lookup' changelog-producer on FIRST_MERGE merge engine"); + "Only support 'none' and 'lookup' changelog-producer on FIRST_MERGE merge engine"); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index 8aeab36ffc66..40a608e1fc23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -148,7 +148,8 @@ private Plan tryFirstPlan() { ScannedResult scannedResult = (ScannedResult) result; currentWatermark = scannedResult.currentWatermark(); long currentSnapshotId = scannedResult.currentSnapshotId(); - if (options.lookupStrategy().equals(LookupStrategy.DELETION_VECTOR_ONLY)) { + LookupStrategy lookupStrategy = options.lookupStrategy(); + if (!lookupStrategy.produceChangelog && lookupStrategy.deletionVector) { // For DELETION_VECTOR_ONLY mode, we need to return the remaining data from level 0 // in the subsequent plan. nextSnapshotId = currentSnapshotId; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index 870df10f94b9..1a59b8b6107f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -73,7 +73,7 @@ public void testDeduplicate(boolean changelogRowDeduplicate) { highLevel::get, EQUALISER, changelogRowDeduplicate, - LookupStrategy.CHANGELOG_ONLY, + LookupStrategy.from(false, true, false), null, null); @@ -233,7 +233,7 @@ public void testSum(boolean changelogRowDeduplicate) { key -> null, EQUALISER, changelogRowDeduplicate, - LookupStrategy.CHANGELOG_ONLY, + LookupStrategy.from(false, true, false), null, null); @@ -322,7 +322,7 @@ public void testMergeHighLevelOrder() { highLevel::get, EQUALISER, false, - LookupStrategy.CHANGELOG_ONLY, + LookupStrategy.from(false, true, false), null, UserDefinedSeqComparator.create( RowType.builder().field("f0", DataTypes.INT()).build(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java index 5d7927a97956..e44aa4c63d89 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java @@ -31,7 +31,6 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for first row merge engine. */ public class FirstRowITCase extends CatalogITCaseBase { @@ -45,23 +44,24 @@ protected List ddl() { } @Test - public void testIllegal() { - assertThatThrownBy( - () -> - sql( - "CREATE TABLE ILLEGAL_T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)" - + " WITH ('merge-engine'='first-row')")) - .hasRootCauseMessage( - "Only support 'lookup' changelog-producer on FIRST_MERGE merge engine"); + public void testBatchQueryNoChangelog() { + sql( + "CREATE TABLE T_NO_CHANGELOG (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('merge-engine'='first-row')"); + testBatchQuery("T_NO_CHANGELOG"); } @Test public void testBatchQuery() { - batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')"); - List result = batchSql("SELECT * FROM T"); + testBatchQuery("T"); + } + + private void testBatchQuery(String table) { + batchSql("INSERT INTO %s VALUES (1, 1, '1'), (1, 2, '2')", table); + List result = batchSql("SELECT * FROM %s", table); assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, "1")); - result = batchSql("SELECT c FROM T"); + result = batchSql("SELECT c FROM %s", table); assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1")); }