-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink](lookup) project row to avoid deserialize unnecessary fields #148
Conversation
66c6abf
to
c49e191
Compare
// to avoid deserialize unnecessary fields | ||
RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); | ||
RowData flinkRow = | ||
flussRowToFlinkRowConverter.toFlinkRowData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flussRowToFlinkRowConverter
works on full fields and it fails ArrayIndexOutOfBoundsException
if passing projected row. Try run com.alibaba.fluss.connector.flink.source.FlinkTableSourceITCase#testLookup1PkTable
.
flussRowToFlinkRowConverter
should also be adapted for projection if we want it work on projected row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, it has been modified, and FlinkTableSourceITCase#testLookup1PkTable has been successfully run
I rebased the branch onto |
@wuchong comments addressed, PTAL, Thanks. |
316bdfb
to
453279b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a commit to address the problems.
new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType)); | ||
new FlussRowToFlinkRowConverter( | ||
FlinkConversions.toFlussRowType( | ||
FlinkLookupFunction.filterRowType(flinkRowType, projection))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
projection
may be null, this can cause NPE
for (int i = 0; i < pkIndex.length; i++) { | ||
types[i] = rowType.getTypeAt(pkIndex[i]); | ||
names[i] = rowType.getFieldNames().get(pkIndex[i]); | ||
static RowType filterRowType(RowType rowType, int[] filterIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, this is called projectRowType
and we can move this to a common util.
RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); | ||
RowData flinkRow = | ||
flussRowToFlinkRowConverter.toFlinkRowData( | ||
ProjectedRow.from(projection).replaceRow(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can cause NPE as the projection
can be null.
453279b
to
f805a7e
Compare
Purpose
project row to avoid deserialize unnecessary fields