diff --git a/paimon-format/src/main/java/org/apache/orc/OrcConf.java b/paimon-format/src/main/java/org/apache/orc/OrcConf.java index a7fa1a21bc8c..ee07e45117a4 100644 --- a/paimon-format/src/main/java/org/apache/orc/OrcConf.java +++ b/paimon-format/src/main/java/org/apache/orc/OrcConf.java @@ -305,6 +305,21 @@ public enum OrcConf { + "must have the filter\n" + "reapplied to avoid using unset values in the unselected rows.\n" + "If unsure please leave this as false."), + + READER_ONLY_ALLOW_SARG_TO_FILTER( + "orc.reader.sarg.to.filter", + "orc.reader.sarg.to.filter", + false, + "A boolean flag to determine if a SArg is allowed to become a filter, only for reader."), + READER_ONLY_USE_SELECTED( + "orc.reader.filter.use.selected", + "orc.reader.filter.use.selected", + false, + "A boolean flag to determine if the selected vector is supported by\n" + + "the reading application, only for reader. If false, the output of the ORC reader " + + "must have the filter\n" + + "reapplied to avoid using unset values in the unselected rows.\n" + + "If unsure please leave this as false."), ALLOW_PLUGIN_FILTER( "orc.filter.plugin", "orc.filter.plugin", diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index 120fb134c366..c2db03aa16ed 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -101,7 +101,6 @@ public Optional createStatsExtractor( public FormatReaderFactory createReaderFactory( RowType projectedRowType, @Nullable List filters) { List orcPredicates = new ArrayList<>(); - if (filters != null) { for (Predicate pred : filters) { Optional orcPred = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 5093a5010773..c0b0131e3401 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -123,7 +123,9 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type); + vectors[i] = + createPaimonVector( + orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type); } return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); } @@ -268,7 +270,13 @@ private static RecordReader createRecordReader( .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) .tolerateMissingSchema( OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - + if (!conjunctPredicates.isEmpty()) { + // TODO fix it , if open this option,future deletion vectors would not work, + // cased by getRowNumber would be changed . + options.useSelected(OrcConf.READER_ONLY_USE_SELECTED.getBoolean(conf)); + options.allowSARGToFilter( + OrcConf.READER_ONLY_ALLOW_SARG_TO_FILTER.getBoolean(conf)); + } // configure filters if (!conjunctPredicates.isEmpty()) { SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 21154c4967b7..0557a72230cc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */ public abstract class AbstractOrcColumnVector @@ -40,37 +41,49 @@ public abstract class AbstractOrcColumnVector private final ColumnVector vector; - AbstractOrcColumnVector(ColumnVector vector) { + private final VectorizedRowBatch orcBatch; + + AbstractOrcColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { this.vector = vector; + this.orcBatch = orcBatch; + } + + protected int rowMapper(int r) { + if (vector.isRepeating) { + return 0; + } + return this.orcBatch.selectedInUse ? this.orcBatch.getSelected()[r] : r; } @Override public boolean isNullAt(int i) { - return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i]; + return !vector.noNulls && vector.isNull[rowMapper(i)]; } public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector( - ColumnVector vector, DataType dataType) { + ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) { if (vector instanceof LongColumnVector) { if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { - return new OrcLegacyTimestampColumnVector((LongColumnVector) vector); + return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch); } else { - return new OrcLongColumnVector((LongColumnVector) vector); + return new OrcLongColumnVector((LongColumnVector) vector, orcBatch); } } else if (vector instanceof DoubleColumnVector) { - return new OrcDoubleColumnVector((DoubleColumnVector) vector); + return new OrcDoubleColumnVector((DoubleColumnVector) vector, orcBatch); } else if (vector instanceof BytesColumnVector) { - return new OrcBytesColumnVector((BytesColumnVector) vector); + return new OrcBytesColumnVector((BytesColumnVector) vector, orcBatch); } else if (vector instanceof DecimalColumnVector) { - return new OrcDecimalColumnVector((DecimalColumnVector) vector); + return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch); } else if (vector instanceof TimestampColumnVector) { - return new OrcTimestampColumnVector(vector); + return new OrcTimestampColumnVector(vector, orcBatch); } else if (vector instanceof ListColumnVector) { - return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) dataType); + return new OrcArrayColumnVector( + (ListColumnVector) vector, orcBatch, (ArrayType) dataType); } else if (vector instanceof StructColumnVector) { - return new OrcRowColumnVector((StructColumnVector) vector, (RowType) dataType); + return new OrcRowColumnVector( + (StructColumnVector) vector, orcBatch, (RowType) dataType); } else if (vector instanceof MapColumnVector) { - return new OrcMapColumnVector((MapColumnVector) vector, (MapType) dataType); + return new OrcMapColumnVector((MapColumnVector) vector, orcBatch, (MapType) dataType); } else { throw new UnsupportedOperationException( "Unsupported vector: " + vector.getClass().getName()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index ed16a0b51084..25a1935f3e4b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */ public class OrcArrayColumnVector extends AbstractOrcColumnVector @@ -32,14 +33,16 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector private final ListColumnVector hiveVector; private final ColumnVector paimonVector; - public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) { - super(hiveVector); + public OrcArrayColumnVector( + ListColumnVector hiveVector, VectorizedRowBatch orcBatch, ArrayType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType()); + this.paimonVector = createPaimonVector(hiveVector.child, orcBatch, type.getElementType()); } @Override public InternalArray getArray(int i) { + i = rowMapper(i); long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarArray(paimonVector, (int) offset, (int) length); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java index d48bad886a47..7f812bb5628b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */ public class OrcBytesColumnVector extends AbstractOrcColumnVector @@ -26,14 +27,14 @@ public class OrcBytesColumnVector extends AbstractOrcColumnVector private final BytesColumnVector vector; - public OrcBytesColumnVector(BytesColumnVector vector) { - super(vector); + public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } @Override public Bytes getBytes(int i) { - int rowId = vector.isRepeating ? 0 : i; + int rowId = rowMapper(i); byte[][] data = vector.vector; int[] start = vector.start; int[] length = vector.length; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java index 9ea4d763a5d8..382c19f45be1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.Decimal; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.math.BigDecimal; @@ -32,15 +33,15 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector private final DecimalColumnVector vector; - public OrcDecimalColumnVector(DecimalColumnVector vector) { - super(vector); + public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } @Override public Decimal getDecimal(int i, int precision, int scale) { - BigDecimal data = - vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue(); + i = rowMapper(i); + BigDecimal data = vector.vector[i].getHiveDecimal().bigDecimalValue(); return Decimal.fromBigDecimal(data, precision, scale); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java index 0c0b0cc51d38..f26dac6de9da 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double @@ -30,18 +31,20 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector private final DoubleColumnVector vector; - public OrcDoubleColumnVector(DoubleColumnVector vector) { - super(vector); + public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } @Override public double getDouble(int i) { - return vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return vector.vector[i]; } @Override public float getFloat(int i) { - return (float) vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return (float) vector.vector[i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java index 18227ecf3dd2..5107e722edb4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.time.LocalDateTime; @@ -34,15 +35,15 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector private final LongColumnVector hiveVector; - OrcLegacyTimestampColumnVector(LongColumnVector vector) { - super(vector); + OrcLegacyTimestampColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.hiveVector = vector; } @Override public Timestamp getTimestamp(int i, int precision) { - int index = hiveVector.isRepeating ? 0 : i; - java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[index]); + i = rowMapper(i); + java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[i]); return Timestamp.fromSQLTimestamp(timestamp); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java index e7dfe0e6134e..c289b74c58b2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int @@ -33,33 +34,38 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector private final LongColumnVector vector; - public OrcLongColumnVector(LongColumnVector vector) { - super(vector); + public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } @Override public long getLong(int i) { - return vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return vector.vector[i]; } @Override public boolean getBoolean(int i) { - return vector.vector[vector.isRepeating ? 0 : i] == 1; + i = rowMapper(i); + return vector.vector[i] == 1; } @Override public byte getByte(int i) { - return (byte) vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return (byte) vector.vector[i]; } @Override public int getInt(int i) { - return (int) vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return (int) vector.vector[i]; } @Override public short getShort(int i) { - return (short) vector.vector[vector.isRepeating ? 0 : i]; + i = rowMapper(i); + return (short) vector.vector[i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java index 66a1af6dccf4..c7245275fdd2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.MapType; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */ public class OrcMapColumnVector extends AbstractOrcColumnVector @@ -33,15 +34,18 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector private final ColumnVector keyPaimonVector; private final ColumnVector valuePaimonVector; - public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) { - super(hiveVector); + public OrcMapColumnVector( + MapColumnVector hiveVector, VectorizedRowBatch orcBatch, MapType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType()); - this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType()); + this.keyPaimonVector = createPaimonVector(hiveVector.keys, orcBatch, type.getKeyType()); + this.valuePaimonVector = + createPaimonVector(hiveVector.values, orcBatch, type.getValueType()); } @Override public InternalMap getMap(int i) { + i = rowMapper(i); long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index caa22467f9c3..6c73c9fdbe0d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ public class OrcRowColumnVector extends AbstractOrcColumnVector @@ -31,18 +32,21 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector private final VectorizedColumnBatch batch; - public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) { - super(hiveVector); + public OrcRowColumnVector( + StructColumnVector hiveVector, VectorizedRowBatch orcBatch, RowType type) { + super(hiveVector, orcBatch); int len = hiveVector.fields.length; ColumnVector[] paimonVectors = new ColumnVector[len]; for (int i = 0; i < len; i++) { - paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i)); + paimonVectors[i] = + createPaimonVector(hiveVector.fields[i], orcBatch, type.getTypeAt(i)); } this.batch = new VectorizedColumnBatch(paimonVectors); } @Override public ColumnarRow getRow(int i) { + i = rowMapper(i); return new ColumnarRow(batch, i); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java index dd8ac08f2f57..a6e71d6016f2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's TimestampColumnVector to Paimon's @@ -33,14 +34,14 @@ public class OrcTimestampColumnVector extends AbstractOrcColumnVector private final TimestampColumnVector vector; - public OrcTimestampColumnVector(ColumnVector vector) { - super(vector); + public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = (TimestampColumnVector) vector; } @Override public Timestamp getTimestamp(int i, int precision) { - int index = vector.isRepeating ? 0 : i; - return DateTimeUtils.toInternal(vector.time[index], vector.nanos[index] % 1_000_000); + i = rowMapper(i); + return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 1_000_000); } }