From b5a4d4d68169a2a26cd0e218f7ccf92e01061408 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 19 Nov 2024 15:41:59 +0800 Subject: [PATCH] fix --- .../paimon/format/orc/OrcFileFormat.java | 6 +- .../paimon/format/orc/OrcReaderFactory.java | 4 +- ...cSplitReaderUtil.java => OrcTypeUtil.java} | 71 +++++++++++++++++-- ...aderUtilTest.java => OrcTypeUtilTest.java} | 36 ++++++++-- 4 files changed, 104 insertions(+), 13 deletions(-) rename paimon-format/src/main/java/org/apache/paimon/format/orc/{reader/OrcSplitReaderUtil.java => OrcTypeUtil.java} (70%) rename paimon-format/src/test/java/org/apache/paimon/format/orc/{OrcSplitReaderUtilTest.java => OrcTypeUtilTest.java} (65%) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index d2cce8b98cf5..c3521c6f1a37 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -28,7 +28,6 @@ import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.format.orc.filter.OrcPredicateFunctionVisitor; import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor; -import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; import org.apache.paimon.options.MemorySize; @@ -123,7 +122,7 @@ public FormatReaderFactory createReaderFactory( @Override public void validateDataFields(RowType rowType) { DataType refinedType = refineDataType(rowType); - OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + OrcTypeUtil.convertToOrcSchema((RowType) refinedType); } /** @@ -141,8 +140,7 @@ public FormatWriterFactory createWriterFactory(RowType type) { DataType refinedType = refineDataType(type); DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]); - TypeDescription typeDescription = - OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType); + TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType); Vectorizer vectorizer = new RowDataVectorizer(typeDescription, orcTypes); return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 65500fdb7901..43851a0d4012 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -55,8 +55,9 @@ import java.io.IOException; import java.util.List; +import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** An ORC reader that produces a stream of {@link ColumnarRow} records. */ @@ -262,6 +263,7 @@ private static RecordReader createRecordReader( boolean deletionVectorsEnabled) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult); + checkStructCompatible(schema, orcReader.getSchema()); try { // get offset and length for the stripes that start in the split Pair offsetAndLength = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java similarity index 70% rename from paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java rename to paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java index 1edf097fb7b9..daa098cbaaed 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.paimon.format.orc.reader; +package org.apache.paimon.format.orc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.CharType; @@ -29,12 +30,18 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.shade.guava30.com.google.common.base.Objects; + import org.apache.orc.TypeDescription; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Util for orc types. */ -public class OrcSplitReaderUtil { +public class OrcTypeUtil { - public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id"; + public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.id"; public static TypeDescription convertToOrcSchema(RowType rowType) { TypeDescription struct = TypeDescription.createStruct(); @@ -45,7 +52,8 @@ public static TypeDescription convertToOrcSchema(RowType rowType) { return struct; } - public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { + @VisibleForTesting + static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { type = type.copy(true); switch (type.getTypeRoot()) { case CHAR: @@ -142,4 +150,59 @@ public static TypeDescription convertToOrcType(DataType type, int fieldId, int d throw new UnsupportedOperationException("Unsupported type: " + type); } } + + public static void checkStructCompatible( + TypeDescription requiredStruct, TypeDescription orcStruct) { + List requiredFields = requiredStruct.getFieldNames(); + List requiredTypes = requiredStruct.getChildren(); + List orcFields = orcStruct.getFieldNames(); + List orcTypes = orcStruct.getChildren(); + + for (int i = 0; i < requiredFields.size(); i++) { + String field = requiredFields.get(i); + int orcIndex = orcFields.indexOf(field); + checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field); + TypeDescription requiredType = requiredTypes.get(i); + TypeDescription orcType = orcTypes.get(orcIndex); + checkField(field, requiredType, orcType); + } + } + + private static void checkField( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + checkFieldIdAttribute(fieldName, requiredType, orcType); + if (requiredType.getCategory().isPrimitive()) { + return; + } + + // see TypeDescription#getPartialName + switch (requiredType.getCategory()) { + case LIST: + checkField( + "_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + return; + case MAP: + checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0)); + checkField( + "_value", requiredType.getChildren().get(1), orcType.getChildren().get(1)); + return; + case STRUCT: + checkStructCompatible(requiredType, orcType); + return; + default: + throw new UnsupportedOperationException("Unsupported orc type: " + requiredType); + } + } + + private static void checkFieldIdAttribute( + String fieldName, TypeDescription requiredType, TypeDescription orcType) { + String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY); + checkArgument( + Objects.equal(requiredId, orcId), + "Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.", + fieldName, + requiredId, + orcId); + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java similarity index 65% rename from paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java rename to paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java index c6f882bf2952..94cde467a651 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcSplitReaderUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java @@ -18,17 +18,23 @@ package org.apache.paimon.format.orc; -import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.orc.TypeDescription; import org.junit.jupiter.api.Test; -import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; +import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcType; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; -/** Test for {@link OrcSplitReaderUtil}. */ -class OrcSplitReaderUtilTest { +/** Test for {@link OrcTypeUtil}. */ +class OrcTypeUtilTest { @Test void testDataTypeToOrcType() { @@ -65,4 +71,26 @@ void testDataTypeToOrcType() { private void test(String expected, DataType type) { assertThat(convertToOrcType(type, -1, -1)).hasToString(expected); } + + @Test + void testCheckFieldIdAttribute() { + RowType full = + RowType.builder() + .field("a", DataTypes.INT()) + .field( + "b", + RowType.builder(true, new AtomicInteger(5)) + .field("f0", DataTypes.STRING()) + .field("f1", DataTypes.INT()) + .build()) + .field("c", DataTypes.ARRAY(DataTypes.INT())) + .field("d", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .build(); + RowType projected = full.project("c", "b", "d"); + + TypeDescription required = convertToOrcSchema(projected); + TypeDescription orc = convertToOrcSchema(full); + + assertThatNoException().isThrownBy(() -> checkStructCompatible(required, orc)); + } }