Skip to content

Commit

Permalink
[core] Unify delete&update&mergeinto for merge engines
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Apr 15, 2024
1 parent a5d1b3c commit ad4cf78
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 212 deletions.
6 changes: 0 additions & 6 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ result in strange behavior. When the input is out of order, we recommend that yo
[Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field" >}}) to correct disorder.
{{< /hint >}}

{{< hint info >}}
Some compute engines support row level update and delete in batch mode but not all merge engines support them.
- Support batch update merge engines: `deduplicate` and `first-row`.
- Support batch delete merge engines: `deduplicate`.
{{< /hint >}}

## Deduplicate

`deduplicate` merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys.
Expand Down
40 changes: 6 additions & 34 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1705,28 +1705,20 @@ public boolean deletionVectorsEnabled() {

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

PARTIAL_UPDATE("partial-update", "Partial update non-null fields.", false, false),
PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),

AGGREGATE("aggregation", "Aggregate fields with same primary key.", false, false),
AGGREGATE("aggregation", "Aggregate fields with same primary key."),

FIRST_ROW("first-row", "De-duplicate and keep the first row.", true, false);
FIRST_ROW("first-row", "De-duplicate and keep the first row.");

private final String value;
private final String description;
private final boolean supportBatchUpdate;
private final boolean supportBatchDelete;

MergeEngine(
String value,
String description,
boolean supportBatchUpdate,
boolean supportBatchDelete) {

MergeEngine(String value, String description) {
this.value = value;
this.description = description;
this.supportBatchUpdate = supportBatchUpdate;
this.supportBatchDelete = supportBatchDelete;
}

@Override
Expand All @@ -1738,26 +1730,6 @@ public String toString() {
public InlineElement getDescription() {
return text(description);
}

public boolean supportBatchUpdate() {
return supportBatchUpdate;
}

public boolean supportBatchDelete() {
return supportBatchDelete;
}

public static List<MergeEngine> supportBatchUpdateEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchUpdate)
.collect(Collectors.toList());
}

public static List<MergeEngine> supportBatchDeleteEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchDelete)
.collect(Collectors.toList());
}
}

/** Specifies the startup mode for log consumer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
Expand All @@ -35,6 +34,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;

/** Delete from table action for Flink. */
public class DeleteAction extends TableActionBase {

Expand All @@ -55,14 +56,11 @@ public DeleteAction(
@Override
public void run() throws Exception {
CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");

if (!mergeEngine.supportBatchDelete()) {
if (mergeEngine != DEDUPLICATE) {
throw new UnsupportedOperationException(
String.format(
"Delete is executed in batch mode, but merge engine %s can not support batch delete."
+ " Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
"Delete is executed in batch mode, but merge engine %s can not support batch delete.",
mergeEngine));
}

LOG.debug("Run delete action with filter '{}'.", filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
Expand Down Expand Up @@ -223,20 +225,10 @@ public void validate() {
}

CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if ((matchedUpsert || notMatchedUpsert) && !mergeEngine.supportBatchUpdate()) {
boolean supportMergeInto = mergeEngine == DEDUPLICATE || mergeEngine == PARTIAL_UPDATE;
if (!supportMergeInto) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_upsert or not_matched_by_source_upsert."
+ " But merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
}

if ((matchedDelete || notMatchedDelete) && !mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_delete or not_matched_by_source_delete."
+ " But merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
String.format("Merge engine %s can not support merge-into.", mergeEngine));
}

if ((matchedUpsert && matchedDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;

/** Flink table sink that supports row level update and delete. */
public abstract class SupportsRowLevelOperationFlinkTableSink extends FlinkTableSinkBase
Expand Down Expand Up @@ -115,11 +117,10 @@ public RowLevelUpdateInfo applyRowLevelUpdate(
});

MergeEngine mergeEngine = options.get(MERGE_ENGINE);
if (!mergeEngine.supportBatchUpdate()) {
boolean supportUpdate = mergeEngine == DEDUPLICATE || mergeEngine == PARTIAL_UPDATE;
if (!supportUpdate) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
String.format("Merge engine %s can not support batch update.", mergeEngine));
}

// Even with partial-update we still need all columns. Because the topology
Expand Down Expand Up @@ -184,11 +185,9 @@ private void validateDeletable() {
}

MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if (!mergeEngine.supportBatchDelete()) {
if (mergeEngine != DEDUPLICATE) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
String.format("Merge engine %s can not support batch delete.", mergeEngine));
}
}

Expand Down
Loading

0 comments on commit ad4cf78

Please sign in to comment.