Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[format][orc] open orc switch useSelected,allowSARGToFilter to make sure pushdown works #4231

Merged
merged 9 commits into from
Nov 12, 2024
15 changes: 15 additions & 0 deletions paimon-format/src/main/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,21 @@ public enum OrcConf {
+ "must have the filter\n"
+ "reapplied to avoid using unset values in the unselected rows.\n"
+ "If unsure please leave this as false."),

READER_ONLY_ALLOW_SARG_TO_FILTER(
"orc.reader.sarg.to.filter",
"orc.reader.sarg.to.filter",
false,
"A boolean flag to determine if a SArg is allowed to become a filter, only for reader."),
READER_ONLY_USE_SELECTED(
"orc.reader.filter.use.selected",
"orc.reader.filter.use.selected",
false,
"A boolean flag to determine if the selected vector is supported by\n"
+ "the reading application, only for reader. If false, the output of the ORC reader "
+ "must have the filter\n"
+ "reapplied to avoid using unset values in the unselected rows.\n"
+ "If unsure please leave this as false."),
ALLOW_PLUGIN_FILTER(
"orc.filter.plugin",
"orc.filter.plugin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, @Nullable List<Predicate> filters) {
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();

if (filters != null) {
for (Predicate pred : filters) {
Optional<OrcFilters.Predicate> orcPred =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public OrcReaderBatch createReaderBatch(
for (int i = 0; i < vectors.length; i++) {
String name = tableFieldNames.get(i);
DataType type = tableFieldTypes.get(i);
vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
vectors[i] =
createPaimonVector(
orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type);
}
return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler);
}
Expand Down Expand Up @@ -268,7 +270,13 @@ private static RecordReader createRecordReader(
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
.tolerateMissingSchema(
OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));

if (!conjunctPredicates.isEmpty()) {
// TODO fix it , if open this option,future deletion vectors would not work,
// cased by getRowNumber would be changed .
options.useSelected(OrcConf.READER_ONLY_USE_SELECTED.getBoolean(conf));
options.allowSARGToFilter(
OrcConf.READER_ONLY_ALLOW_SARG_TO_FILTER.getBoolean(conf));
}
// configure filters
if (!conjunctPredicates.isEmpty()) {
SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,57 @@
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */
public abstract class AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.ColumnVector {

private final ColumnVector vector;

AbstractOrcColumnVector(ColumnVector vector) {
private final VectorizedRowBatch orcBatch;

AbstractOrcColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) {
this.vector = vector;
this.orcBatch = orcBatch;
}

protected int rowMapper(int r) {
if (vector.isRepeating) {
return 0;
}
return this.orcBatch.selectedInUse ? this.orcBatch.getSelected()[r] : r;
}

@Override
public boolean isNullAt(int i) {
return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
return !vector.noNulls && vector.isNull[rowMapper(i)];
}

public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
ColumnVector vector, DataType dataType) {
ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) {
if (vector instanceof LongColumnVector) {
if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
return new OrcLegacyTimestampColumnVector((LongColumnVector) vector);
return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch);
} else {
return new OrcLongColumnVector((LongColumnVector) vector);
return new OrcLongColumnVector((LongColumnVector) vector, orcBatch);
}
} else if (vector instanceof DoubleColumnVector) {
return new OrcDoubleColumnVector((DoubleColumnVector) vector);
return new OrcDoubleColumnVector((DoubleColumnVector) vector, orcBatch);
} else if (vector instanceof BytesColumnVector) {
return new OrcBytesColumnVector((BytesColumnVector) vector);
return new OrcBytesColumnVector((BytesColumnVector) vector, orcBatch);
} else if (vector instanceof DecimalColumnVector) {
return new OrcDecimalColumnVector((DecimalColumnVector) vector);
return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch);
} else if (vector instanceof TimestampColumnVector) {
return new OrcTimestampColumnVector(vector);
return new OrcTimestampColumnVector(vector, orcBatch);
} else if (vector instanceof ListColumnVector) {
return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) dataType);
return new OrcArrayColumnVector(
(ListColumnVector) vector, orcBatch, (ArrayType) dataType);
} else if (vector instanceof StructColumnVector) {
return new OrcRowColumnVector((StructColumnVector) vector, (RowType) dataType);
return new OrcRowColumnVector(
(StructColumnVector) vector, orcBatch, (RowType) dataType);
} else if (vector instanceof MapColumnVector) {
return new OrcMapColumnVector((MapColumnVector) vector, (MapType) dataType);
return new OrcMapColumnVector((MapColumnVector) vector, orcBatch, (MapType) dataType);
} else {
throw new UnsupportedOperationException(
"Unsupported vector: " + vector.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.types.ArrayType;

import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */
public class OrcArrayColumnVector extends AbstractOrcColumnVector
Expand All @@ -32,14 +33,16 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector
private final ListColumnVector hiveVector;
private final ColumnVector paimonVector;

public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
super(hiveVector);
public OrcArrayColumnVector(
ListColumnVector hiveVector, VectorizedRowBatch orcBatch, ArrayType type) {
super(hiveVector, orcBatch);
this.hiveVector = hiveVector;
this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType());
this.paimonVector = createPaimonVector(hiveVector.child, orcBatch, type.getElementType());
}

@Override
public InternalArray getArray(int i) {
i = rowMapper(i);
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
return new ColumnarArray(paimonVector, (int) offset, (int) length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */
public class OrcBytesColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.BytesColumnVector {

private final BytesColumnVector vector;

public OrcBytesColumnVector(BytesColumnVector vector) {
super(vector);
public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public Bytes getBytes(int i) {
int rowId = vector.isRepeating ? 0 : i;
int rowId = rowMapper(i);
byte[][] data = vector.vector;
int[] start = vector.start;
int[] length = vector.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.Decimal;

import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

import java.math.BigDecimal;

Expand All @@ -32,15 +33,15 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector

private final DecimalColumnVector vector;

public OrcDecimalColumnVector(DecimalColumnVector vector) {
super(vector);
public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public Decimal getDecimal(int i, int precision, int scale) {
BigDecimal data =
vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue();
i = rowMapper(i);
BigDecimal data = vector.vector[i].getHiveDecimal().bigDecimalValue();
return Decimal.fromBigDecimal(data, precision, scale);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/**
* This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double
Expand All @@ -30,18 +31,20 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector

private final DoubleColumnVector vector;

public OrcDoubleColumnVector(DoubleColumnVector vector) {
super(vector);
public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public double getDouble(int i) {
return vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return vector.vector[i];
}

@Override
public float getFloat(int i) {
return (float) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (float) vector.vector[i];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

import java.time.LocalDateTime;

Expand All @@ -34,15 +35,15 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector

private final LongColumnVector hiveVector;

OrcLegacyTimestampColumnVector(LongColumnVector vector) {
super(vector);
OrcLegacyTimestampColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.hiveVector = vector;
}

@Override
public Timestamp getTimestamp(int i, int precision) {
int index = hiveVector.isRepeating ? 0 : i;
java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[index]);
i = rowMapper(i);
java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[i]);
return Timestamp.fromSQLTimestamp(timestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/**
* This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int
Expand All @@ -33,33 +34,38 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector

private final LongColumnVector vector;

public OrcLongColumnVector(LongColumnVector vector) {
super(vector);
public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public long getLong(int i) {
return vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return vector.vector[i];
}

@Override
public boolean getBoolean(int i) {
return vector.vector[vector.isRepeating ? 0 : i] == 1;
i = rowMapper(i);
return vector.vector[i] == 1;
}

@Override
public byte getByte(int i) {
return (byte) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (byte) vector.vector[i];
}

@Override
public int getInt(int i) {
return (int) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (int) vector.vector[i];
}

@Override
public short getShort(int i) {
return (short) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (short) vector.vector[i];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.types.MapType;

import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */
public class OrcMapColumnVector extends AbstractOrcColumnVector
Expand All @@ -33,15 +34,18 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector
private final ColumnVector keyPaimonVector;
private final ColumnVector valuePaimonVector;

public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
super(hiveVector);
public OrcMapColumnVector(
MapColumnVector hiveVector, VectorizedRowBatch orcBatch, MapType type) {
super(hiveVector, orcBatch);
this.hiveVector = hiveVector;
this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType());
this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType());
this.keyPaimonVector = createPaimonVector(hiveVector.keys, orcBatch, type.getKeyType());
this.valuePaimonVector =
createPaimonVector(hiveVector.values, orcBatch, type.getValueType());
}

@Override
public InternalMap getMap(int i) {
i = rowMapper(i);
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,29 @@
import org.apache.paimon.types.RowType;

import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */
public class OrcRowColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.RowColumnVector {

private final VectorizedColumnBatch batch;

public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
super(hiveVector);
public OrcRowColumnVector(
StructColumnVector hiveVector, VectorizedRowBatch orcBatch, RowType type) {
super(hiveVector, orcBatch);
int len = hiveVector.fields.length;
ColumnVector[] paimonVectors = new ColumnVector[len];
for (int i = 0; i < len; i++) {
paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i));
paimonVectors[i] =
createPaimonVector(hiveVector.fields[i], orcBatch, type.getTypeAt(i));
}
this.batch = new VectorizedColumnBatch(paimonVectors);
}

@Override
public ColumnarRow getRow(int i) {
i = rowMapper(i);
return new ColumnarRow(batch, i);
}

Expand Down
Loading
Loading