From 86b9477ddeaba8af1385040bfdffe3384acb3c89 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Tue, 2 Apr 2024 19:26:51 +0800 Subject: [PATCH] fix local merge --- .../paimon/flink/sink/LocalMergeOperator.java | 8 ++++- .../paimon/flink/PartialUpdateITCase.java | 32 +++++++++++-------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index 0a5eceb492151..a09372443c8da 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -57,7 +57,8 @@ public class LocalMergeOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; - TableSchema schema; + private final TableSchema schema; + private final boolean ignoreDelete; private transient Projection keyProjection; private transient RecordComparator keyComparator; @@ -76,6 +77,7 @@ public LocalMergeOperator(TableSchema schema) { schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys"); this.schema = schema; + this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -137,6 +139,10 @@ public void processElement(StreamRecord record) throws Exception { RowKind rowKind = rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + if (ignoreDelete && rowKind.isRetract()) { + return; + } + // row kind must be INSERT when it is divided into key and value row.setRowKind(RowKind.INSERT); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index f52a9d243d657..5dd6a732278e7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -37,6 +37,8 @@ import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -420,30 +422,34 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except insert2.close(); } - @Test - public void testIgnoreDelete() throws Exception { + @ParameterizedTest(name = "localMergeEnabled = {0}") + @ValueSource(booleans = {true, false}) + public void testIgnoreDelete(boolean localMerge) throws Exception { sql( - "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH (" + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + " 'merge-engine' = 'partial-update'," - + " 'ignore-delete' = 'true'," - + " 'fields.a.aggregate-function' = 'sum'," - + " 'fields.g.sequence-group'='a')"); + + " 'ignore-delete' = 'true'" + + ")"); + if (localMerge) { + sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')"); + } String id = TestValuesTableFactory.registerData( Arrays.asList( - Row.ofKind(RowKind.INSERT, 1, 10, 1), - Row.ofKind(RowKind.DELETE, 1, 10, 2), - Row.ofKind(RowKind.INSERT, 1, 20, 3))); + Row.ofKind(RowKind.INSERT, 1, null, "apple"), + Row.ofKind(RowKind.DELETE, 1, null, "apple"), + Row.ofKind(RowKind.INSERT, 1, "A", null))); streamSqlIter( - "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) " + "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) " + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', " + "'changelog-mode' = 'I,D')", id) .close(); sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await(); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3)); + assertThat(sql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); } @Test @@ -457,7 +463,7 @@ public void testIgnoreDeleteInReader() throws Exception { // write delete records sql("DELETE FROM ignore_delete WHERE pk = 1"); sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))"); - assertThat(batchSql("SELECT * FROM ignore_delete")) + assertThat(sql("SELECT * FROM ignore_delete")) .containsExactlyInAnyOrder(Row.of(1, "A", null)); // force altering merge engine and read @@ -477,7 +483,7 @@ public void testIgnoreDeleteInReader() throws Exception { Collections.singletonList("pk"), newOptions, null)); - assertThat(batchSql("SELECT * FROM ignore_delete")) + assertThat(sql("SELECT * FROM ignore_delete")) .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); } }