diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index c564b69409c5..d2cce8b98cf5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -123,7 +123,7 @@ public FormatReaderFactory createReaderFactory( @Override public void validateDataFields(RowType rowType) { DataType refinedType = refineDataType(rowType); - OrcSplitReaderUtil.toOrcType(refinedType); + OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); } /** @@ -141,9 +141,9 @@ public FormatWriterFactory createWriterFactory(RowType type) { DataType refinedType = refineDataType(type); DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]); - TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType); - Vectorizer vectorizer = - new RowDataVectorizer(typeDescription.toString(), orcTypes); + TypeDescription typeDescription = + OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + Vectorizer vectorizer = new RowDataVectorizer(typeDescription, orcTypes); return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 05f3dd7851e8..65500fdb7901 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -56,7 +56,7 @@ import java.util.List; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType; +import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** An ORC reader that produces a stream of {@link ColumnarRow} records. */ @@ -81,7 +81,7 @@ public OrcReaderFactory( final int batchSize, final boolean deletionVectorsEnabled) { this.hadoopConfig = checkNotNull(hadoopConfig); - this.schema = toOrcType(readType); + this.schema = convertToOrcSchema(readType); this.tableType = readType; this.conjunctPredicates = checkNotNull(conjunctPredicates); this.batchSize = batchSize; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java index 882f1c753991..1edf097fb7b9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java @@ -18,8 +18,10 @@ package org.apache.paimon.format.orc.reader; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; @@ -32,23 +34,41 @@ /** Util for orc types. */ public class OrcSplitReaderUtil { - public static TypeDescription toOrcType(DataType type) { + public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id"; + + public static TypeDescription convertToOrcSchema(RowType rowType) { + TypeDescription struct = TypeDescription.createStruct(); + for (DataField dataField : rowType.getFields()) { + TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0); + struct.addField(dataField.name(), child); + } + return struct; + } + + public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { type = type.copy(true); switch (type.getTypeRoot()) { case CHAR: - return TypeDescription.createChar().withMaxLength(((CharType) type).getLength()); + return TypeDescription.createChar() + .withMaxLength(((CharType) type).getLength()) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARCHAR: int len = ((VarCharType) type).getLength(); if (len == VarCharType.MAX_LENGTH) { - return TypeDescription.createString(); + return TypeDescription.createString() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } else { - return TypeDescription.createVarchar().withMaxLength(len); + return TypeDescription.createVarchar() + .withMaxLength(len) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } case BOOLEAN: - return TypeDescription.createBoolean(); + return TypeDescription.createBoolean() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARBINARY: if (type.equals(DataTypes.BYTES())) { - return TypeDescription.createBinary(); + return TypeDescription.createBinary() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } else { throw new UnsupportedOperationException( "Not support other binary type: " + type); @@ -57,41 +77,67 @@ public static TypeDescription toOrcType(DataType type) { DecimalType decimalType = (DecimalType) type; return TypeDescription.createDecimal() .withScale(decimalType.getScale()) - .withPrecision(decimalType.getPrecision()); + .withPrecision(decimalType.getPrecision()) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TINYINT: - return TypeDescription.createByte(); + return TypeDescription.createByte() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case SMALLINT: - return TypeDescription.createShort(); + return TypeDescription.createShort() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case INTEGER: case TIME_WITHOUT_TIME_ZONE: - return TypeDescription.createInt(); + return TypeDescription.createInt() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case BIGINT: - return TypeDescription.createLong(); + return TypeDescription.createLong() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case FLOAT: - return TypeDescription.createFloat(); + return TypeDescription.createFloat() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case DOUBLE: - return TypeDescription.createDouble(); + return TypeDescription.createDouble() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case DATE: - return TypeDescription.createDate(); + return TypeDescription.createDate() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TIMESTAMP_WITHOUT_TIME_ZONE: - return TypeDescription.createTimestamp(); + return TypeDescription.createTimestamp() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return TypeDescription.createTimestampInstant(); + return TypeDescription.createTimestampInstant() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case ARRAY: ArrayType arrayType = (ArrayType) type; - return TypeDescription.createList(toOrcType(arrayType.getElementType())); + + String elementFieldId = + String.valueOf(SpecialFields.getArrayElementFieldId(fieldId, depth + 1)); + TypeDescription elementOrcType = + convertToOrcType(arrayType.getElementType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, elementFieldId); + + return TypeDescription.createList(elementOrcType) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case MAP: MapType mapType = (MapType) type; - return TypeDescription.createMap( - toOrcType(mapType.getKeyType()), toOrcType(mapType.getValueType())); + + String mapKeyFieldId = + String.valueOf(SpecialFields.getMapKeyFieldId(fieldId, depth + 1)); + TypeDescription mapKeyOrcType = + convertToOrcType(mapType.getKeyType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapKeyFieldId); + + String mapValueFieldId = + String.valueOf(SpecialFields.getMapValueFieldId(fieldId, depth + 1)); + TypeDescription mapValueOrcType = + convertToOrcType(mapType.getValueType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapValueFieldId); + + return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case ROW: - RowType rowType = (RowType) type; - TypeDescription struct = TypeDescription.createStruct(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - struct.addField( - rowType.getFieldNames().get(i), toOrcType(rowType.getTypeAt(i))); - } - return struct; + return convertToOrcSchema((RowType) type) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); default: throw new UnsupportedOperationException("Unsupported type: " + type); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java index 21443cdf9463..46c936a0263e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; import java.util.Arrays; import java.util.List; @@ -35,7 +36,7 @@ public class RowDataVectorizer extends Vectorizer { private final List fieldWriters; - public RowDataVectorizer(String schema, DataType[] fieldTypes) { + public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) { super(schema); this.fieldWriters = Arrays.stream(fieldTypes) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java index 0f0e6bba74a8..2add46531a61 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java @@ -39,9 +39,9 @@ public abstract class Vectorizer implements Serializable { private final TypeDescription schema; - public Vectorizer(final String schema) { + public Vectorizer(final TypeDescription schema) { checkNotNull(schema); - this.schema = TypeDescription.fromString(schema); + this.schema = schema; } /** diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java index c07838dfa34c..c6f882bf2952 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType; +import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link OrcSplitReaderUtil}. */ @@ -63,6 +63,6 @@ void testDataTypeToOrcType() { } private void test(String expected, DataType type) { - assertThat(toOrcType(type)).hasToString(expected); + assertThat(convertToOrcType(type, -1, -1)).hasToString(expected); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java index 2511d7ed7a9e..52df5afb4efd 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.orc.MemoryManager; import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -47,7 +48,7 @@ void testNotOverrideInMemoryManager(@TempDir java.nio.file.Path tmpDir) throws I OrcWriterFactory factory = new TestOrcWriterFactory( new RowDataVectorizer( - "struct<_col0:string,_col1:int>", + TypeDescription.fromString("struct<_col0:string,_col1:int>"), new DataType[] {DataTypes.STRING(), DataTypes.INT()}), memoryManager); factory.create(new LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");