diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 17d2d937a90e..11661851cf4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -120,7 +120,6 @@ public void add(KeyValue kv) { currentDeleteRow = true; row = new GenericRow(getters.length); } - // ignore -U records return; } @@ -254,10 +253,9 @@ public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } - if (currentDeleteRow) { - return null; - } - return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row); + + RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT; + return reused.replace(currentKey, latestSequenceNumber, rowKind, row); } public static MergeFunctionFactory factory( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 68f109f0427a..e0372b74d500 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -623,11 +623,12 @@ public void testIgnoreDelete(boolean localMerge) throws Exception { } @Test - public void testRemoveRecordOnDelete() { + public void testRemoveRecordOnDelete() throws Exception { sql( "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + " 'merge-engine' = 'partial-update'," - + " 'partial-update.remove-record-on-delete' = 'true'" + + " 'partial-update.remove-record-on-delete' = 'true'," + + " 'changelog-producer' = 'lookup'" + ")"); sql("INSERT INTO remove_record_on_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); @@ -645,5 +646,18 @@ public void testRemoveRecordOnDelete() { // batch read assertThat(sql("SELECT * FROM remove_record_on_delete")) .containsExactlyInAnyOrder(Row.of(1, "A", "apache")); + + // streaming read results has -U + BlockingIterator iterator = + streamSqlBlockIter( + "SELECT * FROM remove_record_on_delete /*+ OPTIONS('scan.timestamp-millis' = '0') */"); + assertThat(iterator.collect(5)) + .containsExactly( + Row.ofKind(RowKind.INSERT, 1, null, "apple"), + Row.ofKind(RowKind.DELETE, 1, null, "apple"), + Row.ofKind(RowKind.INSERT, 1, null, "apache"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, "apache"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache")); + iterator.close(); } }