From b4e68f65acb258cb5fc3a8ddbdd23899e2895601 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 17 Dec 2024 11:24:16 +0800 Subject: [PATCH] [flink] Fix replace logic for reused row --- .../org/apache/paimon/flink/source/operator/ReadOperator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 815722a4e705..20b9e75313cb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -91,7 +91,6 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); if (projectionRowData != null) { - projectionRowData.replaceRow(this.reuseRow); this.reuseRecord = new StreamRecord<>(projectionRowData); } else { this.reuseRecord = new StreamRecord<>(reuseRow); @@ -126,6 +125,9 @@ public void processElement(StreamRecord record) throws Exception { } reuseRow.replace(iterator.next()); + if (projectionRowData != null) { + projectionRowData.replaceRow(this.reuseRow); + } output.collect(reuseRecord); } }