From ddfa08e1f6f5c2e70c78925d58e7f33f6fa8bd7e Mon Sep 17 00:00:00 2001 From: Aitozi Date: Wed, 13 Nov 2024 22:38:11 +0800 Subject: [PATCH] optimize --- .../paimon/table/system/BinlogTable.java | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index 59719053f8fef..98eaf1e3a1102 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -21,8 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.data.BinaryArray; -import org.apache.paimon.data.BinaryArrayWriter; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericArray; @@ -92,7 +90,7 @@ * *

INSERT: [+I, [co1, null], [col2, null]] * - *

UPDATE: [+U, [co1_before, col1_after], [col2_before, col2_after]] + *

UPDATE: [+U, [co1_ub, col1_ua], [col2_ub, col2_ua]] * *

DELETE: [-D, [co1, null], [col2, null]] */ @@ -625,21 +623,15 @@ public RecordReader createReader(Split split) throws IOException { // Because, now we can not guarantee the ub & ua are consumed by the same reader Preconditions.checkArgument( dataSplit.isStreaming(), "The binlog should only enabled for streaming split."); - BinaryArrayWriter arrayWriter = - new BinaryArrayWriter( - new BinaryArray(), - 2, - BinaryArray.calculateFixLengthPartSize(outerRowType())); return new PackChangelogReader( dataRead.createReader(split), - (row1, row2) -> toBinlogRow(row1, row2, arrayWriter), + (row1, row2) -> toBinlogRow(row1, row2, wrapped.rowType().fieldGetters()), wrapped.rowType()); } private InternalRow toBinlogRow( - InternalRow data, InternalRow row2, BinaryArrayWriter writer) { - return new BinlogRow( - readProjection, data, row2, wrapped.rowType().fieldGetters(), writer); + InternalRow data, InternalRow row2, InternalRow.FieldGetter[] fieldGetters) { + return new BinlogRow(readProjection, data, row2, fieldGetters); } } @@ -647,11 +639,7 @@ private InternalRow toBinlogRow( private static class BinlogRow extends ProjectedRow { private BinlogRow( - int[] indexMapping, - InternalRow row, - InternalRow row2, - FieldGetter[] fieldGetters, - BinaryArrayWriter writer) { + int[] indexMapping, InternalRow row, InternalRow row2, FieldGetter[] fieldGetters) { super(indexMapping); replaceRow(convertToArray(row, row2, fieldGetters)); } @@ -659,7 +647,6 @@ private BinlogRow( private InternalRow convertToArray( InternalRow row1, InternalRow row2, FieldGetter[] fieldGetters) { GenericRow row = new GenericRow(row1.getFieldCount()); - // writer.writeBoolean(); for (int i = 0; i < row1.getFieldCount(); i++) { Object o1 = fieldGetters[i].getFieldOrNull(row1); Object o2 = null;