From 6903a88faec30c3494b612d59e6ac945470ac0d6 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Thu, 17 Oct 2024 20:32:50 +0800 Subject: [PATCH] fix delete record exception after partial-update merge engine configuration to lookup changelog producer --- .../compact/PartialUpdateMergeFunction.java | 8 +++----- .../paimon/flink/PartialUpdateITCase.java | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 17d2d937a90e..11661851cf4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -120,7 +120,6 @@ public void add(KeyValue kv) { currentDeleteRow = true; row = new GenericRow(getters.length); } - // ignore -U records return; } @@ -254,10 +253,9 @@ public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } - if (currentDeleteRow) { - return null; - } - return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row); + + RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT; + return reused.replace(currentKey, latestSequenceNumber, rowKind, row); } public static MergeFunctionFactory factory( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 68f109f0427a..e0372b74d500 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -623,11 +623,12 @@ public void testIgnoreDelete(boolean localMerge) throws Exception { } @Test - public void testRemoveRecordOnDelete() { + public void testRemoveRecordOnDelete() throws Exception { sql( "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + " 'merge-engine' = 'partial-update'," - + " 'partial-update.remove-record-on-delete' = 'true'" + + " 'partial-update.remove-record-on-delete' = 'true'," + + " 'changelog-producer' = 'lookup'" + ")"); sql("INSERT INTO remove_record_on_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); @@ -645,5 +646,18 @@ public void testRemoveRecordOnDelete() { // batch read assertThat(sql("SELECT * FROM remove_record_on_delete")) .containsExactlyInAnyOrder(Row.of(1, "A", "apache")); + + // streaming read results has -U + BlockingIterator iterator = + streamSqlBlockIter( + "SELECT * FROM remove_record_on_delete /*+ OPTIONS('scan.timestamp-millis' = '0') */"); + assertThat(iterator.collect(5)) + .containsExactly( + Row.ofKind(RowKind.INSERT, 1, null, "apple"), + Row.ofKind(RowKind.DELETE, 1, null, "apple"), + Row.ofKind(RowKind.INSERT, 1, null, "apache"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, "apache"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache")); + iterator.close(); } }