From 2920fc953805d99d07053b48551602f86a12fc2e Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 15 Oct 2024 11:55:49 +0800 Subject: [PATCH] [format][orc] VectorizedRowBatch to OrcColumnVector for selected rows can be saw. --- .../paimon/format/orc/OrcReaderFactory.java | 3 +- .../orc/reader/AbstractOrcColumnVector.java | 33 ++++++++++--------- .../orc/reader/OrcArrayColumnVector.java | 8 +++-- .../orc/reader/OrcBytesColumnVector.java | 5 +-- .../orc/reader/OrcDecimalColumnVector.java | 5 +-- .../orc/reader/OrcDoubleColumnVector.java | 5 +-- .../OrcLegacyTimestampColumnVector.java | 5 +-- .../orc/reader/OrcLongColumnVector.java | 5 +-- .../format/orc/reader/OrcMapColumnVector.java | 10 +++--- .../format/orc/reader/OrcRowColumnVector.java | 8 +++-- .../orc/reader/OrcTimestampColumnVector.java | 5 +-- 11 files changed, 52 insertions(+), 40 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 2b9e88d89ccd..6b1590f03109 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -123,10 +123,9 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - int[] selected = orcBatch.selectedInUse ? orcBatch.getSelected() : null; vectors[i] = createPaimonVector( - orcBatch.cols[tableFieldNames.indexOf(name)], selected, type); + orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type); } return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 587ffc1c459d..377ff37a2e68 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */ public abstract class AbstractOrcColumnVector @@ -40,18 +41,18 @@ public abstract class AbstractOrcColumnVector private final ColumnVector vector; - private final int[] selected; + private final VectorizedRowBatch orcBatch; - AbstractOrcColumnVector(ColumnVector vector, int[] selected) { + AbstractOrcColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { this.vector = vector; - this.selected = selected; + this.orcBatch = orcBatch; } protected int rowMapper(int r) { - if (this.selected != null) { - return selected[r]; + if (vector.isRepeating) { + return 0; } - return r; + return this.orcBatch.selectedInUse ? this.orcBatch.getSelected()[r] : r; } @Override @@ -60,29 +61,29 @@ public boolean isNullAt(int i) { } public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector( - ColumnVector vector, int[] selected, DataType dataType) { + ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) { if (vector instanceof LongColumnVector) { if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { - return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, selected); + return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch); } else { - return new OrcLongColumnVector((LongColumnVector) vector, selected); + return new OrcLongColumnVector((LongColumnVector) vector, orcBatch); } } else if (vector instanceof DoubleColumnVector) { - return new OrcDoubleColumnVector((DoubleColumnVector) vector, selected); + return new OrcDoubleColumnVector((DoubleColumnVector) vector, orcBatch); } else if (vector instanceof BytesColumnVector) { - return new OrcBytesColumnVector((BytesColumnVector) vector, selected); + return new OrcBytesColumnVector((BytesColumnVector) vector, orcBatch); } else if (vector instanceof DecimalColumnVector) { - return new OrcDecimalColumnVector((DecimalColumnVector) vector, selected); + return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch); } else if (vector instanceof TimestampColumnVector) { - return new OrcTimestampColumnVector(vector, selected); + return new OrcTimestampColumnVector(vector, orcBatch); } else if (vector instanceof ListColumnVector) { return new OrcArrayColumnVector( - (ListColumnVector) vector, selected, (ArrayType) dataType); + (ListColumnVector) vector, orcBatch, (ArrayType) dataType); } else if (vector instanceof StructColumnVector) { return new OrcRowColumnVector( - (StructColumnVector) vector, selected, (RowType) dataType); + (StructColumnVector) vector, orcBatch, (RowType) dataType); } else if (vector instanceof MapColumnVector) { - return new OrcMapColumnVector((MapColumnVector) vector, selected, (MapType) dataType); + return new OrcMapColumnVector((MapColumnVector) vector, orcBatch, (MapType) dataType); } else { throw new UnsupportedOperationException( "Unsupported vector: " + vector.getClass().getName()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index ade154221835..25a1935f3e4b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */ public class OrcArrayColumnVector extends AbstractOrcColumnVector @@ -32,10 +33,11 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector private final ListColumnVector hiveVector; private final ColumnVector paimonVector; - public OrcArrayColumnVector(ListColumnVector hiveVector, int[] selected, ArrayType type) { - super(hiveVector, selected); + public OrcArrayColumnVector( + ListColumnVector hiveVector, VectorizedRowBatch orcBatch, ArrayType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.paimonVector = createPaimonVector(hiveVector.child, selected, type.getElementType()); + this.paimonVector = createPaimonVector(hiveVector.child, orcBatch, type.getElementType()); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java index a2664f3b8e45..92b4853aaae4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */ public class OrcBytesColumnVector extends AbstractOrcColumnVector @@ -26,8 +27,8 @@ public class OrcBytesColumnVector extends AbstractOrcColumnVector private final BytesColumnVector vector; - public OrcBytesColumnVector(BytesColumnVector vector, int[] selected) { - super(vector, selected); + public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java index 30963bb29fee..c8545723caf0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.Decimal; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.math.BigDecimal; @@ -32,8 +33,8 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector private final DecimalColumnVector vector; - public OrcDecimalColumnVector(DecimalColumnVector vector, int[] selected) { - super(vector, selected); + public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java index 0353e4aea9f4..3e19b137a222 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double @@ -30,8 +31,8 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector private final DoubleColumnVector vector; - public OrcDoubleColumnVector(DoubleColumnVector vector, int[] selected) { - super(vector, selected); + public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java index 508895632374..eb3e014f2fcf 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.time.LocalDateTime; @@ -34,8 +35,8 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector private final LongColumnVector hiveVector; - OrcLegacyTimestampColumnVector(LongColumnVector vector, int[] selected) { - super(vector, selected); + OrcLegacyTimestampColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.hiveVector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java index 96f6922029a3..76c1c198d21d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int @@ -33,8 +34,8 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector private final LongColumnVector vector; - public OrcLongColumnVector(LongColumnVector vector, int[] selected) { - super(vector, selected); + public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java index e45aea60b59c..c7245275fdd2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.MapType; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */ public class OrcMapColumnVector extends AbstractOrcColumnVector @@ -33,12 +34,13 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector private final ColumnVector keyPaimonVector; private final ColumnVector valuePaimonVector; - public OrcMapColumnVector(MapColumnVector hiveVector, int[] selected, MapType type) { - super(hiveVector, selected); + public OrcMapColumnVector( + MapColumnVector hiveVector, VectorizedRowBatch orcBatch, MapType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.keyPaimonVector = createPaimonVector(hiveVector.keys, selected, type.getKeyType()); + this.keyPaimonVector = createPaimonVector(hiveVector.keys, orcBatch, type.getKeyType()); this.valuePaimonVector = - createPaimonVector(hiveVector.values, selected, type.getValueType()); + createPaimonVector(hiveVector.values, orcBatch, type.getValueType()); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index 2572dcb565f2..6c73c9fdbe0d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ public class OrcRowColumnVector extends AbstractOrcColumnVector @@ -31,13 +32,14 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector private final VectorizedColumnBatch batch; - public OrcRowColumnVector(StructColumnVector hiveVector, int[] selected, RowType type) { - super(hiveVector, selected); + public OrcRowColumnVector( + StructColumnVector hiveVector, VectorizedRowBatch orcBatch, RowType type) { + super(hiveVector, orcBatch); int len = hiveVector.fields.length; ColumnVector[] paimonVectors = new ColumnVector[len]; for (int i = 0; i < len; i++) { paimonVectors[i] = - createPaimonVector(hiveVector.fields[i], selected, type.getTypeAt(i)); + createPaimonVector(hiveVector.fields[i], orcBatch, type.getTypeAt(i)); } this.batch = new VectorizedColumnBatch(paimonVectors); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java index 758ef5d6e94c..8840a02a8c83 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's TimestampColumnVector to Paimon's @@ -33,8 +34,8 @@ public class OrcTimestampColumnVector extends AbstractOrcColumnVector private final TimestampColumnVector vector; - public OrcTimestampColumnVector(ColumnVector vector, int[] selected) { - super(vector, selected); + public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = (TimestampColumnVector) vector; }