Skip to content

Commit

Permalink
optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Nov 13, 2024
1 parent 54cef42 commit ddfa08e
Showing 1 changed file with 5 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericArray;
Expand Down Expand Up @@ -92,7 +90,7 @@
*
* <p>INSERT: [+I, [co1, null], [col2, null]]
*
* <p>UPDATE: [+U, [co1_before, col1_after], [col2_before, col2_after]]
* <p>UPDATE: [+U, [co1_ub, col1_ua], [col2_ub, col2_ua]]
*
* <p>DELETE: [-D, [co1, null], [col2, null]]
*/
Expand Down Expand Up @@ -625,41 +623,30 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
// Because, now we can not guarantee the ub & ua are consumed by the same reader
Preconditions.checkArgument(
dataSplit.isStreaming(), "The binlog should only enabled for streaming split.");
BinaryArrayWriter arrayWriter =
new BinaryArrayWriter(
new BinaryArray(),
2,
BinaryArray.calculateFixLengthPartSize(outerRowType()));
return new PackChangelogReader(
dataRead.createReader(split),
(row1, row2) -> toBinlogRow(row1, row2, arrayWriter),
(row1, row2) -> toBinlogRow(row1, row2, wrapped.rowType().fieldGetters()),
wrapped.rowType());
}

private InternalRow toBinlogRow(
InternalRow data, InternalRow row2, BinaryArrayWriter writer) {
return new BinlogRow(
readProjection, data, row2, wrapped.rowType().fieldGetters(), writer);
InternalRow data, InternalRow row2, InternalRow.FieldGetter[] fieldGetters) {
return new BinlogRow(readProjection, data, row2, fieldGetters);
}
}

/** A {@link ProjectedRow} which returns row kind when mapping index is negative. */
private static class BinlogRow extends ProjectedRow {

private BinlogRow(
int[] indexMapping,
InternalRow row,
InternalRow row2,
FieldGetter[] fieldGetters,
BinaryArrayWriter writer) {
int[] indexMapping, InternalRow row, InternalRow row2, FieldGetter[] fieldGetters) {
super(indexMapping);
replaceRow(convertToArray(row, row2, fieldGetters));
}

private InternalRow convertToArray(
InternalRow row1, InternalRow row2, FieldGetter[] fieldGetters) {
GenericRow row = new GenericRow(row1.getFieldCount());
// writer.writeBoolean();
for (int i = 0; i < row1.getFieldCount(); i++) {
Object o1 = fieldGetters[i].getFieldOrNull(row1);
Object o2 = null;
Expand Down

0 comments on commit ddfa08e

Please sign in to comment.