Skip to content

Commit

Permalink
[flink] Fix replace logic for reused row
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 17, 2024
1 parent 1c10be1 commit b4e68f6
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void open() throws Exception {
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
if (projectionRowData != null) {
projectionRowData.replaceRow(this.reuseRow);
this.reuseRecord = new StreamRecord<>(projectionRowData);
} else {
this.reuseRecord = new StreamRecord<>(reuseRow);
Expand Down Expand Up @@ -126,6 +125,9 @@ public void processElement(StreamRecord<Split> record) throws Exception {
}

reuseRow.replace(iterator.next());
if (projectionRowData != null) {
projectionRowData.replaceRow(this.reuseRow);
}
output.collect(reuseRecord);
}
}
Expand Down

0 comments on commit b4e68f6

Please sign in to comment.