From 9dc27e57bc3c62820fbb051aaa9b24a37cbacfc0 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 13 Nov 2024 19:06:48 +0800 Subject: [PATCH 1/3] [orc] Add type id to orc files --- .../paimon/format/orc/OrcFileFormat.java | 8 +- .../paimon/format/orc/OrcReaderFactory.java | 4 +- .../format/orc/reader/OrcSplitReaderUtil.java | 98 ++++++++++++++----- .../format/orc/writer/RowDataVectorizer.java | 3 +- .../paimon/format/orc/writer/Vectorizer.java | 4 +- .../format/orc/OrcSplitReaderUtilTest.java | 4 +- .../format/orc/OrcWriterFactoryTest.java | 3 +- 7 files changed, 86 insertions(+), 38 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index c564b69409c5..d2cce8b98cf5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -123,7 +123,7 @@ public FormatReaderFactory createReaderFactory( @Override public void validateDataFields(RowType rowType) { DataType refinedType = refineDataType(rowType); - OrcSplitReaderUtil.toOrcType(refinedType); + OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); } /** @@ -141,9 +141,9 @@ public FormatWriterFactory createWriterFactory(RowType type) { DataType refinedType = refineDataType(type); DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]); - TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType); - Vectorizer vectorizer = - new RowDataVectorizer(typeDescription.toString(), orcTypes); + TypeDescription typeDescription = + OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + Vectorizer vectorizer = new RowDataVectorizer(typeDescription, orcTypes); return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 05f3dd7851e8..65500fdb7901 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -56,7 +56,7 @@ import java.util.List; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType; +import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** An ORC reader that produces a stream of {@link ColumnarRow} records. */ @@ -81,7 +81,7 @@ public OrcReaderFactory( final int batchSize, final boolean deletionVectorsEnabled) { this.hadoopConfig = checkNotNull(hadoopConfig); - this.schema = toOrcType(readType); + this.schema = convertToOrcSchema(readType); this.tableType = readType; this.conjunctPredicates = checkNotNull(conjunctPredicates); this.batchSize = batchSize; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java index 882f1c753991..1edf097fb7b9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java @@ -18,8 +18,10 @@ package org.apache.paimon.format.orc.reader; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; @@ -32,23 +34,41 @@ /** Util for orc types. */ public class OrcSplitReaderUtil { - public static TypeDescription toOrcType(DataType type) { + public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id"; + + public static TypeDescription convertToOrcSchema(RowType rowType) { + TypeDescription struct = TypeDescription.createStruct(); + for (DataField dataField : rowType.getFields()) { + TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0); + struct.addField(dataField.name(), child); + } + return struct; + } + + public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { type = type.copy(true); switch (type.getTypeRoot()) { case CHAR: - return TypeDescription.createChar().withMaxLength(((CharType) type).getLength()); + return TypeDescription.createChar() + .withMaxLength(((CharType) type).getLength()) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARCHAR: int len = ((VarCharType) type).getLength(); if (len == VarCharType.MAX_LENGTH) { - return TypeDescription.createString(); + return TypeDescription.createString() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } else { - return TypeDescription.createVarchar().withMaxLength(len); + return TypeDescription.createVarchar() + .withMaxLength(len) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } case BOOLEAN: - return TypeDescription.createBoolean(); + return TypeDescription.createBoolean() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARBINARY: if (type.equals(DataTypes.BYTES())) { - return TypeDescription.createBinary(); + return TypeDescription.createBinary() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); } else { throw new UnsupportedOperationException( "Not support other binary type: " + type); @@ -57,41 +77,67 @@ public static TypeDescription toOrcType(DataType type) { DecimalType decimalType = (DecimalType) type; return TypeDescription.createDecimal() .withScale(decimalType.getScale()) - .withPrecision(decimalType.getPrecision()); + .withPrecision(decimalType.getPrecision()) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TINYINT: - return TypeDescription.createByte(); + return TypeDescription.createByte() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case SMALLINT: - return TypeDescription.createShort(); + return TypeDescription.createShort() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case INTEGER: case TIME_WITHOUT_TIME_ZONE: - return TypeDescription.createInt(); + return TypeDescription.createInt() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case BIGINT: - return TypeDescription.createLong(); + return TypeDescription.createLong() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case FLOAT: - return TypeDescription.createFloat(); + return TypeDescription.createFloat() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case DOUBLE: - return TypeDescription.createDouble(); + return TypeDescription.createDouble() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case DATE: - return TypeDescription.createDate(); + return TypeDescription.createDate() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TIMESTAMP_WITHOUT_TIME_ZONE: - return TypeDescription.createTimestamp(); + return TypeDescription.createTimestamp() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return TypeDescription.createTimestampInstant(); + return TypeDescription.createTimestampInstant() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case ARRAY: ArrayType arrayType = (ArrayType) type; - return TypeDescription.createList(toOrcType(arrayType.getElementType())); + + String elementFieldId = + String.valueOf(SpecialFields.getArrayElementFieldId(fieldId, depth + 1)); + TypeDescription elementOrcType = + convertToOrcType(arrayType.getElementType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, elementFieldId); + + return TypeDescription.createList(elementOrcType) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case MAP: MapType mapType = (MapType) type; - return TypeDescription.createMap( - toOrcType(mapType.getKeyType()), toOrcType(mapType.getValueType())); + + String mapKeyFieldId = + String.valueOf(SpecialFields.getMapKeyFieldId(fieldId, depth + 1)); + TypeDescription mapKeyOrcType = + convertToOrcType(mapType.getKeyType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapKeyFieldId); + + String mapValueFieldId = + String.valueOf(SpecialFields.getMapValueFieldId(fieldId, depth + 1)); + TypeDescription mapValueOrcType = + convertToOrcType(mapType.getValueType(), fieldId, depth + 1) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapValueFieldId); + + return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case ROW: - RowType rowType = (RowType) type; - TypeDescription struct = TypeDescription.createStruct(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - struct.addField( - rowType.getFieldNames().get(i), toOrcType(rowType.getTypeAt(i))); - } - return struct; + return convertToOrcSchema((RowType) type) + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); default: throw new UnsupportedOperationException("Unsupported type: " + type); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java index 21443cdf9463..46c936a0263e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; import java.util.Arrays; import java.util.List; @@ -35,7 +36,7 @@ public class RowDataVectorizer extends Vectorizer { private final List fieldWriters; - public RowDataVectorizer(String schema, DataType[] fieldTypes) { + public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) { super(schema); this.fieldWriters = Arrays.stream(fieldTypes) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java index 0f0e6bba74a8..2add46531a61 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java @@ -39,9 +39,9 @@ public abstract class Vectorizer implements Serializable { private final TypeDescription schema; - public Vectorizer(final String schema) { + public Vectorizer(final TypeDescription schema) { checkNotNull(schema); - this.schema = TypeDescription.fromString(schema); + this.schema = schema; } /** diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java index c07838dfa34c..c6f882bf2952 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType; +import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link OrcSplitReaderUtil}. */ @@ -63,6 +63,6 @@ void testDataTypeToOrcType() { } private void test(String expected, DataType type) { - assertThat(toOrcType(type)).hasToString(expected); + assertThat(convertToOrcType(type, -1, -1)).hasToString(expected); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java index 2511d7ed7a9e..52df5afb4efd 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.orc.MemoryManager; import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -47,7 +48,7 @@ void testNotOverrideInMemoryManager(@TempDir java.nio.file.Path tmpDir) throws I OrcWriterFactory factory = new TestOrcWriterFactory( new RowDataVectorizer( - "struct<_col0:string,_col1:int>", + TypeDescription.fromString("struct<_col0:string,_col1:int>"), new DataType[] {DataTypes.STRING(), DataTypes.INT()}), memoryManager); factory.create(new LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4"); From b5a4d4d68169a2a26cd0e218f7ccf92e01061408 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 19 Nov 2024 15:41:59 +0800 Subject: [PATCH 2/3] fix --- .../paimon/format/orc/OrcFileFormat.java | 6 +- .../paimon/format/orc/OrcReaderFactory.java | 4 +- ...cSplitReaderUtil.java => OrcTypeUtil.java} | 71 +++++++++++++++++-- ...aderUtilTest.java => OrcTypeUtilTest.java} | 36 ++++++++-- 4 files changed, 104 insertions(+), 13 deletions(-) rename paimon-format/src/main/java/org/apache/paimon/format/orc/{reader/OrcSplitReaderUtil.java => OrcTypeUtil.java} (70%) rename paimon-format/src/test/java/org/apache/paimon/format/orc/{OrcSplitReaderUtilTest.java => OrcTypeUtilTest.java} (65%) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index d2cce8b98cf5..c3521c6f1a37 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -28,7 +28,6 @@ import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.format.orc.filter.OrcPredicateFunctionVisitor; import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor; -import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; import org.apache.paimon.options.MemorySize; @@ -123,7 +122,7 @@ public FormatReaderFactory createReaderFactory( @Override public void validateDataFields(RowType rowType) { DataType refinedType = refineDataType(rowType); - OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + OrcTypeUtil.convertToOrcSchema((RowType) refinedType); } /** @@ -141,8 +140,7 @@ public FormatWriterFactory createWriterFactory(RowType type) { DataType refinedType = refineDataType(type); DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]); - TypeDescription typeDescription = - OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType); Vectorizer vectorizer = new RowDataVectorizer(typeDescription, orcTypes); return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 65500fdb7901..43851a0d4012 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -55,8 +55,9 @@ import java.io.IOException; import java.util.List; +import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** An ORC reader that produces a stream of {@link ColumnarRow} records. */ @@ -262,6 +263,7 @@ private static RecordReader createRecordReader( boolean deletionVectorsEnabled) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult); + checkStructCompatible(schema, orcReader.getSchema()); try { // get offset and length for the stripes that start in the split Pair offsetAndLength = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java similarity index 70% rename from paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java rename to paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java index 1edf097fb7b9..daa098cbaaed 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.paimon.format.orc.reader; +package org.apache.paimon.format.orc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.CharType; @@ -29,12 +30,18 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.shade.guava30.com.google.common.base.Objects; + import org.apache.orc.TypeDescription; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Util for orc types. */ -public class OrcSplitReaderUtil { +public class OrcTypeUtil { - public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id"; + public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.id"; public static TypeDescription convertToOrcSchema(RowType rowType) { TypeDescription struct = TypeDescription.createStruct(); @@ -45,7 +52,8 @@ public static TypeDescription convertToOrcSchema(RowType rowType) { return struct; } - public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { + @VisibleForTesting + static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { type = type.copy(true); switch (type.getTypeRoot()) { case CHAR: @@ -142,4 +150,59 @@ public static TypeDescription convertToOrcType(DataType type, int fieldId, int d throw new UnsupportedOperationException("Unsupported type: " + type); } } + + public static void checkStructCompatible( + TypeDescription requiredStruct, TypeDescription orcStruct) { + List requiredFields = requiredStruct.getFieldNames(); + List requiredTypes = requiredStruct.getChildren(); + List orcFields = orcStruct.getFieldNames(); + List orcTypes = orcStruct.getChildren(); + + for (int i = 0; i < requiredFields.size(); i++) { + String field = requiredFields.get(i); + int orcIndex = orcFields.indexOf(field); + checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field); + TypeDescription requiredType = requiredTypes.get(i); + TypeDescription orcType = orcTypes.get(orcIndex); + checkField(field, requiredType, orcType); + } + } + + private static void checkField( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + checkFieldIdAttribute(fieldName, requiredType, orcType); + if (requiredType.getCategory().isPrimitive()) { + return; + } + + // see TypeDescription#getPartialName + switch (requiredType.getCategory()) { + case LIST: + checkField( + "_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + return; + case MAP: + checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + checkField( + "_value", requiredType.getChildren().get(1), orcType.getChildren().get(1)); + return; + case STRUCT: + checkStructCompatible(requiredType, orcType); + return; + default: + throw new UnsupportedOperationException("Unsupported orc type: " + requiredType); + } + } + + private static void checkFieldIdAttribute( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + checkArgument( + Objects.equal(requiredId, orcId), + "Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.", + fieldName, + requiredId, + orcId); + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java similarity index 65% rename from paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java rename to paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java index c6f882bf2952..94cde467a651 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java @@ -18,17 +18,23 @@ package org.apache.paimon.format.orc; -import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.orc.TypeDescription; import org.junit.jupiter.api.Test; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcType; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; -/** Test for {@link OrcSplitReaderUtil}. */ -class OrcSplitReaderUtilTest { +/** Test for {@link OrcTypeUtil}. */ +class OrcTypeUtilTest { @Test void testDataTypeToOrcType() { @@ -65,4 +71,26 @@ void testDataTypeToOrcType() { private void test(String expected, DataType type) { assertThat(convertToOrcType(type, -1, -1)).hasToString(expected); } + + @Test + void testCheckFieldIdAttribute() { + RowType full = + RowType.builder() + .field("a", DataTypes.INT()) + .field( + "b", + RowType.builder(true, new AtomicInteger(5)) + .field("f0", DataTypes.STRING()) + .field("f1", DataTypes.INT()) + .build()) + .field("c", DataTypes.ARRAY(DataTypes.INT())) + .field("d", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .build(); + RowType projected = full.project("c", "b", "d"); + + TypeDescription required = convertToOrcSchema(projected); + TypeDescription orc = convertToOrcSchema(full); + + assertThatNoException().isThrownBy(() -> checkStructCompatible(required, orc)); + } } From f38b896f7b73930696631d9272dddc4940b6c566 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 19 Nov 2024 16:39:09 +0800 Subject: [PATCH 3/3] fix --- .../paimon/format/orc/OrcReaderFactory.java | 2 - .../apache/paimon/format/orc/OrcTypeUtil.java | 61 --------- .../paimon/format/orc/OrcTypeUtilTest.java | 126 ++++++++++++++++-- 3 files changed, 118 insertions(+), 71 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 43851a0d4012..ee0f8a55c034 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -55,7 +55,6 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -263,7 +262,6 @@ private static RecordReader createRecordReader( boolean deletionVectorsEnabled) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult); - checkStructCompatible(schema, orcReader.getSchema()); try { // get offset and length for the stripes that start in the split Pair offsetAndLength = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java index daa098cbaaed..f7d3d626d44f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java @@ -30,14 +30,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; -import org.apache.paimon.shade.guava30.com.google.common.base.Objects; - import org.apache.orc.TypeDescription; -import java.util.List; - -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Util for orc types. */ public class OrcTypeUtil { @@ -150,59 +144,4 @@ static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { throw new UnsupportedOperationException("Unsupported type: " + type); } } - - public static void checkStructCompatible( - TypeDescription requiredStruct, TypeDescription orcStruct) { - List requiredFields = requiredStruct.getFieldNames(); - List requiredTypes = requiredStruct.getChildren(); - List orcFields = orcStruct.getFieldNames(); - List orcTypes = orcStruct.getChildren(); - - for (int i = 0; i < requiredFields.size(); i++) { - String field = requiredFields.get(i); - int orcIndex = orcFields.indexOf(field); - checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field); - TypeDescription requiredType = requiredTypes.get(i); - TypeDescription orcType = orcTypes.get(orcIndex); - checkField(field, requiredType, orcType); - } - } - - private static void checkField( - String fieldName, TypeDescription requiredType, TypeDescription orcType) { - checkFieldIdAttribute(fieldName, requiredType, orcType); - if (requiredType.getCategory().isPrimitive()) { - return; - } - - // see TypeDescription#getPartialName - switch (requiredType.getCategory()) { - case LIST: - checkField( - "_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0)); - return; - case MAP: - checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0)); - checkField( - "_value", requiredType.getChildren().get(1), orcType.getChildren().get(1)); - return; - case STRUCT: - checkStructCompatible(requiredType, orcType); - return; - default: - throw new UnsupportedOperationException("Unsupported orc type: " + requiredType); - } - } - - private static void checkFieldIdAttribute( - String fieldName, TypeDescription requiredType, TypeDescription orcType) { - String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); - String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); - checkArgument( - Objects.equal(requiredId, orcId), - "Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.", - fieldName, - requiredId, - orcId); - } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java index 94cde467a651..5669ac33d443 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java @@ -18,18 +18,35 @@ package org.apache.paimon.format.orc; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.shade.guava30.com.google.common.base.Objects; + +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.Reader; import org.apache.orc.TypeDescription; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; +import static org.apache.paimon.format.orc.OrcFileFormat.refineDataType; +import static org.apache.paimon.format.orc.OrcTypeUtil.PAIMON_ORC_FIELD_ID_KEY; import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcType; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; @@ -73,24 +90,117 @@ private void test(String expected, DataType type) { } @Test - void testCheckFieldIdAttribute() { - RowType full = + void testFieldIdAttribute(@TempDir java.nio.file.Path tempPath) throws IOException { + RowType rowType = RowType.builder() .field("a", DataTypes.INT()) .field( "b", - RowType.builder(true, new AtomicInteger(5)) + RowType.builder(true, new AtomicInteger(10)) .field("f0", DataTypes.STRING()) .field("f1", DataTypes.INT()) .build()) .field("c", DataTypes.ARRAY(DataTypes.INT())) .field("d", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .field( + "e", + DataTypes.ARRAY( + RowType.builder(true, new AtomicInteger(20)) + .field("f0", DataTypes.STRING()) + .field("f1", DataTypes.INT()) + .build())) + .field( + "f", + RowType.builder(true, new AtomicInteger(30)) + .field("f0", DataTypes.ARRAY(DataTypes.INT())) + .build()) .build(); - RowType projected = full.project("c", "b", "d"); - TypeDescription required = convertToOrcSchema(projected); - TypeDescription orc = convertToOrcSchema(full); + // write schema to orc file then get + FileIO fileIO = LocalFileIO.create(); + Path tempFile = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + + OrcFileFormat format = + new OrcFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); + PositionOutputStream out = fileIO.newOutputStream(tempFile, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.close(); + out.close(); + + Reader orcReader = + OrcReaderFactory.createReader(new Configuration(), fileIO, tempFile, null); + TypeDescription orcSchema = orcReader.getSchema(); + + RowType refined = (RowType) refineDataType(rowType); + + assertThatNoException() + .isThrownBy(() -> checkStruct(convertToOrcSchema(refined), orcSchema)); + + assertThatNoException() + .isThrownBy( + () -> + checkStruct( + convertToOrcSchema(refined.project("c", "b", "d")), + orcSchema)); + + assertThatNoException() + .isThrownBy( + () -> + checkStruct( + convertToOrcSchema(refined.project("a", "e", "f")), + orcSchema)); + } + + private void checkStruct(TypeDescription requiredStruct, TypeDescription orcStruct) { + List requiredFields = requiredStruct.getFieldNames(); + List requiredTypes = requiredStruct.getChildren(); + List orcFields = orcStruct.getFieldNames(); + List orcTypes = orcStruct.getChildren(); + + for (int i = 0; i < requiredFields.size(); i++) { + String field = requiredFields.get(i); + int orcIndex = orcFields.indexOf(field); + checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field); + TypeDescription requiredType = requiredTypes.get(i); + TypeDescription orcType = orcTypes.get(orcIndex); + checkField(field, requiredType, orcType); + } + } + + private void checkField( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + checkFieldIdAttribute(fieldName, requiredType, orcType); + if (requiredType.getCategory().isPrimitive()) { + return; + } + + switch (requiredType.getCategory()) { + case LIST: + checkField( + "_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + return; + case MAP: + checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + checkField( + "_value", requiredType.getChildren().get(1), orcType.getChildren().get(1)); + return; + case STRUCT: + checkStruct(requiredType, orcType); + return; + default: + throw new UnsupportedOperationException("Unsupported orc type: " + requiredType); + } + } - assertThatNoException().isThrownBy(() -> checkStructCompatible(required, orc)); + private void checkFieldIdAttribute( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + checkArgument( + Objects.equal(requiredId, orcId), + "Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.", + fieldName, + requiredId, + orcId); } }