Skip to content

Commit

Permalink
[flink] Add merge-engine check when executing row level batch update …
Browse files Browse the repository at this point in the history
…and delete (apache#3181)
  • Loading branch information
yuzelin authored Apr 15, 2024
1 parent aa92afa commit a5d1b3c
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 153 deletions.
6 changes: 6 additions & 0 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ result in strange behavior. When the input is out of order, we recommend that yo
[Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field" >}}) to correct disorder.
{{< /hint >}}

{{< hint info >}}
Some compute engines support row level update and delete in batch mode but not all merge engines support them.
- Support batch update merge engines: `deduplicate` and `first-row`.
- Support batch delete merge engines: `deduplicate`.
{{< /hint >}}

## Deduplicate

`deduplicate` merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys.
Expand Down
40 changes: 34 additions & 6 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1705,20 +1705,28 @@ public boolean deletionVectorsEnabled() {

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),

PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
PARTIAL_UPDATE("partial-update", "Partial update non-null fields.", false, false),

AGGREGATE("aggregation", "Aggregate fields with same primary key."),
AGGREGATE("aggregation", "Aggregate fields with same primary key.", false, false),

FIRST_ROW("first-row", "De-duplicate and keep the first row.");
FIRST_ROW("first-row", "De-duplicate and keep the first row.", true, false);

private final String value;
private final String description;

MergeEngine(String value, String description) {
private final boolean supportBatchUpdate;
private final boolean supportBatchDelete;

MergeEngine(
String value,
String description,
boolean supportBatchUpdate,
boolean supportBatchDelete) {
this.value = value;
this.description = description;
this.supportBatchUpdate = supportBatchUpdate;
this.supportBatchDelete = supportBatchDelete;
}

@Override
Expand All @@ -1730,6 +1738,26 @@ public String toString() {
public InlineElement getDescription() {
return text(description);
}

public boolean supportBatchUpdate() {
return supportBatchUpdate;
}

public boolean supportBatchDelete() {
return supportBatchDelete;
}

public static List<MergeEngine> supportBatchUpdateEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchUpdate)
.collect(Collectors.toList());
}

public static List<MergeEngine> supportBatchDeleteEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchDelete)
.collect(Collectors.toList());
}
}

/** Specifies the startup mode for log consumer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -51,6 +54,17 @@ public DeleteAction(

@Override
public void run() throws Exception {
CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");

if (!mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"Delete is executed in batch mode, but merge engine %s can not support batch delete."
+ " Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}

LOG.debug("Run delete action with filter '{}'.", filter);

Table queriedTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -96,11 +97,11 @@ public class MergeIntoAction extends TableActionBase {
private String mergeCondition;

// actions to be taken
boolean matchedUpsert;
boolean notMatchedUpsert;
boolean matchedDelete;
boolean notMatchedDelete;
boolean insert;
private boolean matchedUpsert;
private boolean notMatchedUpsert;
private boolean matchedDelete;
private boolean notMatchedDelete;
private boolean insert;

// upsert
@Nullable String matchedUpsertCondition;
Expand Down Expand Up @@ -215,6 +216,49 @@ public MergeIntoAction withNotMatchedInsert(
return this;
}

public void validate() {
if (!matchedUpsert && !notMatchedUpsert && !matchedDelete && !notMatchedDelete && !insert) {
throw new IllegalArgumentException(
"Must specify at least one merge action. Run 'merge_into --help' for help.");
}

CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if ((matchedUpsert || notMatchedUpsert) && !mergeEngine.supportBatchUpdate()) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_upsert or not_matched_by_source_upsert."
+ " But merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
}

if ((matchedDelete || notMatchedDelete) && !mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_delete or not_matched_by_source_delete."
+ " But merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}

if ((matchedUpsert && matchedDelete)
&& (matchedUpsertCondition == null || matchedDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both matched-upsert and matched-delete actions are present, their conditions must both be present too.");
}

if ((notMatchedUpsert && notMatchedDelete)
&& (notMatchedBySourceUpsertCondition == null
|| notMatchedBySourceDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, "
+ "their conditions must both be present too.\n");
}

if (notMatchedBySourceUpsertSet != null && notMatchedBySourceUpsertSet.equals("*")) {
throw new IllegalArgumentException(
"The '*' cannot be used in not_matched_by_source_upsert_set");
}
}

@Override
public void run() throws Exception {
DataStream<RowData> dataStream = buildDataStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
params.get(NOT_MATCHED_INSERT_VALUES));
}

validate(action);
action.validate();

return Optional.of(action);
}
Expand Down Expand Up @@ -202,36 +202,4 @@ public void printHelp() {
System.out.println(
" It will find matched rows of target table that meet condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
}

public static void validate(MergeIntoAction action) {
if (!action.matchedUpsert
&& !action.notMatchedUpsert
&& !action.matchedDelete
&& !action.notMatchedDelete
&& !action.insert) {
throw new IllegalArgumentException(
"Must specify at least one merge action. Run 'merge_into --help' for help.");
}

if ((action.matchedUpsert && action.matchedDelete)
&& (action.matchedUpsertCondition == null
|| action.matchedDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both matched-upsert and matched-delete actions are present, their conditions must both be present too.");
}

if ((action.notMatchedUpsert && action.notMatchedDelete)
&& (action.notMatchedBySourceUpsertCondition == null
|| action.notMatchedBySourceDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, "
+ "their conditions must both be present too.\n");
}

if (action.notMatchedBySourceUpsertSet != null
&& action.notMatchedBySourceUpsertSet.equals("*")) {
throw new IllegalArgumentException(
"The '*' cannot be used in not_matched_by_source_upsert_set");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
import org.apache.paimon.flink.action.MergeIntoActionFactory;

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -222,7 +221,7 @@ public String[] call(
}

action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
MergeIntoActionFactory.validate(action);
action.validate();

DataStream<RowData> dataStream = action.buildDataStream();
TableResult tableResult = action.batchSink(dataStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
Expand Down Expand Up @@ -89,47 +90,45 @@ public RowLevelUpdateInfo applyRowLevelUpdate(
// Since only UPDATE_AFTER type messages can be received at present,
// AppendOnlyFileStoreTable cannot correctly handle old data, so they are marked as
// unsupported. Similarly, it is not allowed to update the primary key column when updating
// the column of PrimaryKeyFileStoreTable, because the old data cannot be handled
// correctly.
if (table.primaryKeys().size() > 0) {
Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
column -> {
if (primaryKeys.contains(column.getName())) {
String errMsg =
String.format(
"Updates to primary keys are not supported, primaryKeys (%s), updatedColumns (%s)",
primaryKeys,
updatedColumns.stream()
.map(Column::getName)
.collect(Collectors.toList()));
throw new UnsupportedOperationException(errMsg);
}
});
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
|| options.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
// Even with partial-update we still need all columns. Because the topology
// structure is source -> cal -> constraintEnforcer -> sink, in the
// constraintEnforcer operator, the constraint check will be performed according to
// the index, not according to the column. So we can't return only some columns,
// which will cause problems like ArrayIndexOutOfBoundsException.
// TODO: return partial columns after FLINK-32001 is resolved.
return new RowLevelUpdateInfo() {};
}
throw new UnsupportedOperationException(
String.format(
"%s can not support update, currently only %s of %s and %s can support update.",
options.get(MERGE_ENGINE),
MERGE_ENGINE.key(),
MergeEngine.DEDUPLICATE,
MergeEngine.PARTIAL_UPDATE));
} else {
// the column of PrimaryKeyFileStoreTable, because the old data cannot be handled correctly.
if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"%s can not support update, because there is no primary key.",
table.getClass().getName()));
}

Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
column -> {
if (primaryKeys.contains(column.getName())) {
String errMsg =
String.format(
"Updates to primary keys are not supported, primaryKeys (%s), updatedColumns (%s)",
primaryKeys,
updatedColumns.stream()
.map(Column::getName)
.collect(Collectors.toList()));
throw new UnsupportedOperationException(errMsg);
}
});

MergeEngine mergeEngine = options.get(MERGE_ENGINE);
if (!mergeEngine.supportBatchUpdate()) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
}

// Even with partial-update we still need all columns. Because the topology
// structure is source -> cal -> constraintEnforcer -> sink, in the
// constraintEnforcer operator, the constraint check will be performed according to
// the index, not according to the column. So we can't return only some columns,
// which will cause problems like ArrayIndexOutOfBoundsException.
// TODO: return partial columns after FLINK-32001 is resolved.
return new RowLevelUpdateInfo() {};
}

@Override
Expand Down Expand Up @@ -177,21 +176,20 @@ public Optional<Long> executeDeletion() {
}

private void validateDeletable() {
if (table.primaryKeys().size() > 0) {
Options options = Options.fromMap(table.options());
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
return;
}
throw new UnsupportedOperationException(
String.format(
"merge engine '%s' can not support delete, currently only %s can support delete.",
options.get(MERGE_ENGINE), MergeEngine.DEDUPLICATE));
} else {
if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"table '%s' can not support delete, because there is no primary key.",
table.getClass().getName()));
}

MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if (!mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}
}

private boolean canPushDownDeleteFilter() {
Expand Down
Loading

0 comments on commit a5d1b3c

Please sign in to comment.