diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index ce8c8043ae40..b763b7aaa448 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -241,10 +241,13 @@ All available procedures are listed below. matched_upsert_setting => 'matchedUpsertSetting',
not_matched_insert_condition => 'notMatchedInsertCondition',
not_matched_insert_values => 'notMatchedInsertValues',
- matched_delete_condition => 'matchedDeleteCondition')

+ matched_delete_condition => 'matchedDeleteCondition',
+ not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition',
+ not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting',
+ not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition')

- To perform "MERGE INTO" syntax. See merge_into action for + To perform "MERGE INTO" syntax. See merge_into action for details of arguments. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java index cf8d7191953e..e297c0bdbb4c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java @@ -102,7 +102,19 @@ public class MergeIntoProcedure extends ProcedureBase { @ArgumentHint( name = "matched_delete_condition", type = @DataTypeHint("STRING"), - isOptional = true) + isOptional = true), + @ArgumentHint( + name = "not_matched_by_source_upsert_condition", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "not_matched_by_source_upsert_setting", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "not_matched_by_source_delete_condition", + type = @DataTypeHint("STRING"), + isOptional = true), }) public String[] call( ProcedureContext procedureContext, @@ -115,7 +127,10 @@ public String[] call( String matchedUpsertSetting, String notMatchedInsertCondition, String notMatchedInsertValues, - String matchedDeleteCondition) { + String matchedDeleteCondition, + String notMatchedBySourceUpsertCondition, + String notMatchedBySourceUpsertSetting, + String notMatchedBySourceDeleteCondition) { targetAlias = notnull(targetAlias); sourceSqls = notnull(sourceSqls); sourceTable = notnull(sourceTable); @@ -125,6 +140,9 @@ public String[] call( notMatchedInsertCondition = notnull(notMatchedInsertCondition); notMatchedInsertValues = notnull(notMatchedInsertValues); matchedDeleteCondition = notnull(matchedDeleteCondition); + notMatchedBySourceUpsertCondition = notnull(notMatchedBySourceUpsertCondition); + notMatchedBySourceUpsertSetting = notnull(notMatchedBySourceUpsertSetting); + notMatchedBySourceDeleteCondition = notnull(notMatchedBySourceDeleteCondition); String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); @@ -166,6 +184,20 @@ public String[] call( action.withMatchedDelete(matchedDeleteCondition); } + if (!notMatchedBySourceUpsertCondition.isEmpty() + || !notMatchedBySourceUpsertSetting.isEmpty()) { + String condition = nullable(notMatchedBySourceUpsertCondition); + String values = nullable(notMatchedBySourceUpsertSetting); + checkArgument( + !"*".equals(values), + "not-matched-by-source-upsert does not support setting notMatchedBySourceUpsertSetting to *."); + action.withNotMatchedBySourceUpsert(condition, values); + } + + if (!notMatchedBySourceDeleteCondition.isEmpty()) { + action.withNotMatchedBySourceDelete(notMatchedBySourceDeleteCondition); + } + action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment()); action.validate(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 41d607fac5f6..3907c0398532 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -571,6 +571,123 @@ public void testIllegalSourceNameSqlCase() { .satisfies(anyCauseMatches(ValidationException.class, "Object 'S' not found")); } + @ParameterizedTest + @MethodSource("testArguments") + public void testNotMatchedBySourceUpsert(boolean qualified, String invoker) throws Exception { + sEnv.executeSql("DROP TABLE T"); + prepareTargetTable(CoreOptions.ChangelogProducer.INPUT); + + // build MergeIntoAction + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S") + .withSourceTable(qualified ? "default.SS" : "SS") + .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") + .withNotMatchedBySourceUpsert( + "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'"); + + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', '', '', '', 'dt < ''02-28''', 'v = v || ''_nmu'', last_action = ''not_matched_upsert''')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "source_sqls => '%s', " + + "source_table => '%s', " + + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', " + + "not_matched_by_source_upsert_condition => 'dt < ''02-28'''," + + "not_matched_by_source_upsert_setting => 'v = v || ''_nmu'', last_action = ''not_matched_upsert''')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } + + List streamingExpected = + Arrays.asList( + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27")); + + List batchExpected = + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "v_7", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28")); + + if ("action".equals(invoker)) { + validateActionRunResult(action.build(), streamingExpected, batchExpected); + } else { + validateProcedureResult(procedureStatement, streamingExpected, batchExpected); + } + } + + @ParameterizedTest + @MethodSource("testArguments") + public void testNotMatchedBySourceDelete(boolean qualified, String invoker) throws Exception { + // build MergeIntoAction + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S") + .withSourceTable(qualified ? "default.SS" : "SS") + .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") + .withNotMatchedBySourceDelete(null); + + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', '', '', '', '', '', 'TRUE')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "source_sqls => '%s', " + + "source_table => '%s', " + + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', " + + "not_matched_by_source_delete_condition => 'TRUE')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } + + List streamingExpected = + Arrays.asList( + changelogRow("-D", 2, "v_2", "creation", "02-27"), + changelogRow("-D", 3, "v_3", "creation", "02-27"), + changelogRow("-D", 5, "v_5", "creation", "02-28"), + changelogRow("-D", 6, "v_6", "creation", "02-28"), + changelogRow("-D", 9, "v_9", "creation", "02-28"), + changelogRow("-D", 10, "v_10", "creation", "02-28")); + + List batchExpected = + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 7, "v_7", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28")); + + if ("action".equals(invoker)) { + validateActionRunResult(action.build(), streamingExpected, batchExpected); + } else { + validateProcedureResult(procedureStatement, streamingExpected, batchExpected); + } + } + private void validateActionRunResult( MergeIntoAction action, List streamingExpected, List batchExpected) throws Exception {