Skip to content

Commit

Permalink
[core] Fisrt row merge engine supports none changelog producer (#3452)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored May 31, 2024
1 parent dd12fd4 commit d01e882
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 48 deletions.
5 changes: 2 additions & 3 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,8 @@ By specifying `'merge-engine' = 'first-row'`, users can keep the first row of th
`deduplicate` merge engine that in the `first-row` merge engine, it will generate insert only changelog.

{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
1. You can not specify `sequence.field`.
2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
{{< /hint >}}

This is of great help in replacing log deduplication in streaming computation.
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,8 @@ public boolean needLookup() {

public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP),
mergeEngine().equals(MergeEngine.FIRST_ROW),
changelogProducer().equals(ChangelogProducer.LOOKUP),
deletionVectorsEnabled());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,25 @@
package org.apache.paimon.lookup;

/** Strategy for lookup. */
public enum LookupStrategy {
NO_LOOKUP(false, false),

CHANGELOG_ONLY(true, false),

DELETION_VECTOR_ONLY(false, true),

CHANGELOG_AND_DELETION_VECTOR(true, true);
public class LookupStrategy {

public final boolean needLookup;

public final boolean isFirstRow;

public final boolean produceChangelog;

public final boolean deletionVector;

LookupStrategy(boolean produceChangelog, boolean deletionVector) {
private LookupStrategy(boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
this.isFirstRow = isFirstRow;
this.produceChangelog = produceChangelog;
this.deletionVector = deletionVector;
this.needLookup = produceChangelog || deletionVector;
this.needLookup = produceChangelog || deletionVector || isFirstRow;
}

public static LookupStrategy from(boolean produceChangelog, boolean deletionVector) {
for (LookupStrategy strategy : values()) {
if (strategy.produceChangelog == produceChangelog
&& strategy.deletionVector == deletionVector) {
return strategy;
}
}
throw new IllegalArgumentException(
"Invalid combination of produceChangelog : "
+ produceChangelog
+ " and deletionVector : "
+ deletionVector);
public static LookupStrategy from(
boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
return new LookupStrategy(isFirstRow, produceChangelog, deletionVector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@

import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;

/** {@link FileStoreWrite} for {@link KeyValueFileStore}. */
Expand Down Expand Up @@ -273,6 +272,7 @@ private MergeTreeCompactRewriter createRewriter(
int maxLevel = options.numLevels() - 1;
MergeEngine mergeEngine = options.mergeEngine();
ChangelogProducer changelogProducer = options.changelogProducer();
LookupStrategy lookupStrategy = options.lookupStrategy();
if (changelogProducer.equals(FULL_COMPACTION)) {
return new FullChangelogMergeTreeCompactRewriter(
maxLevel,
Expand All @@ -285,12 +285,11 @@ private MergeTreeCompactRewriter createRewriter(
mergeSorter,
valueEqualiserSupplier.get(),
options.changelogRowDeduplicate());
} else if (options.needLookup()) {
LookupStrategy lookupStrategy = options.lookupStrategy();
} else if (lookupStrategy.needLookup) {
LookupLevels.ValueProcessor<?> processor;
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> wrapperFactory;
FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
if (mergeEngine == FIRST_ROW) {
if (lookupStrategy.isFirstRow) {
if (options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
"First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ public static void validateTableSchema(TableSchema schema) {
}

if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP
&& options.changelogProducer() != ChangelogProducer.NONE) {
throw new IllegalArgumentException(
"Only support 'lookup' changelog-producer on FIRST_MERGE merge engine");
"Only support 'none' and 'lookup' changelog-producer on FIRST_MERGE merge engine");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private Plan tryFirstPlan() {
ScannedResult scannedResult = (ScannedResult) result;
currentWatermark = scannedResult.currentWatermark();
long currentSnapshotId = scannedResult.currentSnapshotId();
if (options.lookupStrategy().equals(LookupStrategy.DELETION_VECTOR_ONLY)) {
LookupStrategy lookupStrategy = options.lookupStrategy();
if (!lookupStrategy.produceChangelog && lookupStrategy.deletionVector) {
// For DELETION_VECTOR_ONLY mode, we need to return the remaining data from level 0
// in the subsequent plan.
nextSnapshotId = currentSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
highLevel::get,
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.CHANGELOG_ONLY,
LookupStrategy.from(false, true, false),
null,
null);

Expand Down Expand Up @@ -233,7 +233,7 @@ public void testSum(boolean changelogRowDeduplicate) {
key -> null,
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.CHANGELOG_ONLY,
LookupStrategy.from(false, true, false),
null,
null);

Expand Down Expand Up @@ -322,7 +322,7 @@ public void testMergeHighLevelOrder() {
highLevel::get,
EQUALISER,
false,
LookupStrategy.CHANGELOG_ONLY,
LookupStrategy.from(false, true, false),
null,
UserDefinedSeqComparator.create(
RowType.builder().field("f0", DataTypes.INT()).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** ITCase for first row merge engine. */
public class FirstRowITCase extends CatalogITCaseBase {
Expand All @@ -45,23 +44,24 @@ protected List<String> ddl() {
}

@Test
public void testIllegal() {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE ILLEGAL_T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ " WITH ('merge-engine'='first-row')"))
.hasRootCauseMessage(
"Only support 'lookup' changelog-producer on FIRST_MERGE merge engine");
public void testBatchQueryNoChangelog() {
sql(
"CREATE TABLE T_NO_CHANGELOG (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ " WITH ('merge-engine'='first-row')");
testBatchQuery("T_NO_CHANGELOG");
}

@Test
public void testBatchQuery() {
batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
List<Row> result = batchSql("SELECT * FROM T");
testBatchQuery("T");
}

private void testBatchQuery(String table) {
batchSql("INSERT INTO %s VALUES (1, 1, '1'), (1, 2, '2')", table);
List<Row> result = batchSql("SELECT * FROM %s", table);
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, "1"));

result = batchSql("SELECT c FROM T");
result = batchSql("SELECT c FROM %s", table);
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
}

Expand Down

0 comments on commit d01e882

Please sign in to comment.