Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add merge-engine check when executing row level batch update and delete #3181

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ result in strange behavior. When the input is out of order, we recommend that yo
[Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field" >}}) to correct disorder.
{{< /hint >}}

{{< hint info >}}
Some compute engines support row level update and delete in batch mode but not all merge engines support them.
- Support batch update merge engines: `deduplicate` and `first-row`.
- Support batch delete merge engines: `deduplicate`.
{{< /hint >}}

## Deduplicate

`deduplicate` merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys.
Expand Down
40 changes: 34 additions & 6 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1692,20 +1692,28 @@ public boolean deletionVectorsEnabled() {

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),

PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
PARTIAL_UPDATE("partial-update", "Partial update non-null fields.", false, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using partial-update and no col contains agg, update is theoretically supported


AGGREGATE("aggregation", "Aggregate fields with same primary key."),
AGGREGATE("aggregation", "Aggregate fields with same primary key.", false, false),

FIRST_ROW("first-row", "De-duplicate and keep the first row.");
FIRST_ROW("first-row", "De-duplicate and keep the first row.", true, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first-row does not need to support update


private final String value;
private final String description;

MergeEngine(String value, String description) {
private final boolean supportBatchUpdate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not modify this, this is public api, we don't need to expose here.

private final boolean supportBatchDelete;

MergeEngine(
String value,
String description,
boolean supportBatchUpdate,
boolean supportBatchDelete) {
this.value = value;
this.description = description;
this.supportBatchUpdate = supportBatchUpdate;
this.supportBatchDelete = supportBatchDelete;
}

@Override
Expand All @@ -1717,6 +1725,26 @@ public String toString() {
public InlineElement getDescription() {
return text(description);
}

public boolean supportBatchUpdate() {
return supportBatchUpdate;
}

public boolean supportBatchDelete() {
return supportBatchDelete;
}

public static List<MergeEngine> supportBatchUpdateEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchUpdate)
.collect(Collectors.toList());
}

public static List<MergeEngine> supportBatchDeleteEngines() {
return Arrays.stream(MergeEngine.values())
.filter(MergeEngine::supportBatchDelete)
.collect(Collectors.toList());
}
}

/** Specifies the startup mode for log consumer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -51,6 +54,17 @@ public DeleteAction(

@Override
public void run() throws Exception {
CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");

if (!mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"Delete is executed in batch mode, but merge engine %s can not support batch delete."
+ " Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}

LOG.debug("Run delete action with filter '{}'.", filter);

Table queriedTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -96,11 +97,11 @@ public class MergeIntoAction extends TableActionBase {
private String mergeCondition;

// actions to be taken
boolean matchedUpsert;
boolean notMatchedUpsert;
boolean matchedDelete;
boolean notMatchedDelete;
boolean insert;
private boolean matchedUpsert;
private boolean notMatchedUpsert;
private boolean matchedDelete;
private boolean notMatchedDelete;
private boolean insert;

// upsert
@Nullable String matchedUpsertCondition;
Expand Down Expand Up @@ -215,6 +216,49 @@ public MergeIntoAction withNotMatchedInsert(
return this;
}

public void validate() {
if (!matchedUpsert && !notMatchedUpsert && !matchedDelete && !notMatchedDelete && !insert) {
throw new IllegalArgumentException(
"Must specify at least one merge action. Run 'merge_into --help' for help.");
}

CoreOptions.MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if ((matchedUpsert || notMatchedUpsert) && !mergeEngine.supportBatchUpdate()) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_upsert or not_matched_by_source_upsert."
+ " But merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
}

if ((matchedDelete || notMatchedDelete) && !mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"merge-into is executed in batch mode, and you have set matched_delete or not_matched_by_source_delete."
+ " But merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}

if ((matchedUpsert && matchedDelete)
&& (matchedUpsertCondition == null || matchedDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both matched-upsert and matched-delete actions are present, their conditions must both be present too.");
}

if ((notMatchedUpsert && notMatchedDelete)
&& (notMatchedBySourceUpsertCondition == null
|| notMatchedBySourceDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, "
+ "their conditions must both be present too.\n");
}

if (notMatchedBySourceUpsertSet != null && notMatchedBySourceUpsertSet.equals("*")) {
throw new IllegalArgumentException(
"The '*' cannot be used in not_matched_by_source_upsert_set");
}
}

@Override
public void run() throws Exception {
DataStream<RowData> dataStream = buildDataStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
params.get(NOT_MATCHED_INSERT_VALUES));
}

validate(action);
action.validate();

return Optional.of(action);
}
Expand Down Expand Up @@ -202,36 +202,4 @@ public void printHelp() {
System.out.println(
" It will find matched rows of target table that meet condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
}

public static void validate(MergeIntoAction action) {
if (!action.matchedUpsert
&& !action.notMatchedUpsert
&& !action.matchedDelete
&& !action.notMatchedDelete
&& !action.insert) {
throw new IllegalArgumentException(
"Must specify at least one merge action. Run 'merge_into --help' for help.");
}

if ((action.matchedUpsert && action.matchedDelete)
&& (action.matchedUpsertCondition == null
|| action.matchedDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both matched-upsert and matched-delete actions are present, their conditions must both be present too.");
}

if ((action.notMatchedUpsert && action.notMatchedDelete)
&& (action.notMatchedBySourceUpsertCondition == null
|| action.notMatchedBySourceDeleteCondition == null)) {
throw new IllegalArgumentException(
"If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, "
+ "their conditions must both be present too.\n");
}

if (action.notMatchedBySourceUpsertSet != null
&& action.notMatchedBySourceUpsertSet.equals("*")) {
throw new IllegalArgumentException(
"The '*' cannot be used in not_matched_by_source_upsert_set");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
import org.apache.paimon.flink.action.MergeIntoActionFactory;

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -222,7 +221,7 @@ public String[] call(
}

action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
MergeIntoActionFactory.validate(action);
action.validate();

DataStream<RowData> dataStream = action.buildDataStream();
TableResult tableResult = action.batchSink(dataStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
Expand Down Expand Up @@ -89,47 +90,45 @@ public RowLevelUpdateInfo applyRowLevelUpdate(
// Since only UPDATE_AFTER type messages can be received at present,
// AppendOnlyFileStoreTable cannot correctly handle old data, so they are marked as
// unsupported. Similarly, it is not allowed to update the primary key column when updating
// the column of PrimaryKeyFileStoreTable, because the old data cannot be handled
// correctly.
if (table.primaryKeys().size() > 0) {
Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
column -> {
if (primaryKeys.contains(column.getName())) {
String errMsg =
String.format(
"Updates to primary keys are not supported, primaryKeys (%s), updatedColumns (%s)",
primaryKeys,
updatedColumns.stream()
.map(Column::getName)
.collect(Collectors.toList()));
throw new UnsupportedOperationException(errMsg);
}
});
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
|| options.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
// Even with partial-update we still need all columns. Because the topology
// structure is source -> cal -> constraintEnforcer -> sink, in the
// constraintEnforcer operator, the constraint check will be performed according to
// the index, not according to the column. So we can't return only some columns,
// which will cause problems like ArrayIndexOutOfBoundsException.
// TODO: return partial columns after FLINK-32001 is resolved.
return new RowLevelUpdateInfo() {};
}
throw new UnsupportedOperationException(
String.format(
"%s can not support update, currently only %s of %s and %s can support update.",
options.get(MERGE_ENGINE),
MERGE_ENGINE.key(),
MergeEngine.DEDUPLICATE,
MergeEngine.PARTIAL_UPDATE));
} else {
// the column of PrimaryKeyFileStoreTable, because the old data cannot be handled correctly.
if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"%s can not support update, because there is no primary key.",
table.getClass().getName()));
}

Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
column -> {
if (primaryKeys.contains(column.getName())) {
String errMsg =
String.format(
"Updates to primary keys are not supported, primaryKeys (%s), updatedColumns (%s)",
primaryKeys,
updatedColumns.stream()
.map(Column::getName)
.collect(Collectors.toList()));
throw new UnsupportedOperationException(errMsg);
}
});

MergeEngine mergeEngine = options.get(MERGE_ENGINE);
if (!mergeEngine.supportBatchUpdate()) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch update. Support batch update merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchUpdateEngines()));
}

// Even with partial-update we still need all columns. Because the topology
// structure is source -> cal -> constraintEnforcer -> sink, in the
// constraintEnforcer operator, the constraint check will be performed according to
// the index, not according to the column. So we can't return only some columns,
// which will cause problems like ArrayIndexOutOfBoundsException.
// TODO: return partial columns after FLINK-32001 is resolved.
return new RowLevelUpdateInfo() {};
}

@Override
Expand Down Expand Up @@ -177,21 +176,20 @@ public Optional<Long> executeDeletion() {
}

private void validateDeletable() {
if (table.primaryKeys().size() > 0) {
Options options = Options.fromMap(table.options());
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
return;
}
throw new UnsupportedOperationException(
String.format(
"merge engine '%s' can not support delete, currently only %s can support delete.",
options.get(MERGE_ENGINE), MergeEngine.DEDUPLICATE));
} else {
if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"table '%s' can not support delete, because there is no primary key.",
table.getClass().getName()));
}

MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine();
if (!mergeEngine.supportBatchDelete()) {
throw new UnsupportedOperationException(
String.format(
"Merge engine %s can not support batch delete. Support batch delete merge engines are: %s.",
mergeEngine, CoreOptions.MergeEngine.supportBatchDeleteEngines()));
}
}

private boolean canPushDownDeleteFilter() {
Expand Down
Loading