Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Nov 19, 2024
1 parent f206d40 commit c33df0d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.format.orc.filter.OrcPredicateFunctionVisitor;
import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor;
import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil;
import org.apache.paimon.format.orc.writer.RowDataVectorizer;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -123,7 +122,7 @@ public FormatReaderFactory createReaderFactory(
@Override
public void validateDataFields(RowType rowType) {
DataType refinedType = refineDataType(rowType);
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
}

/**
Expand All @@ -141,8 +140,7 @@ public FormatWriterFactory createWriterFactory(RowType type) {
DataType refinedType = refineDataType(type);
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);

TypeDescription typeDescription =
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
Vectorizer<InternalRow> vectorizer = new RowDataVectorizer(typeDescription, orcTypes);

return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
Expand Down Expand Up @@ -262,6 +263,7 @@ private static RecordReader createRecordReader(
boolean deletionVectorsEnabled)
throws IOException {
org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult);
checkStructCompatible(schema, orcReader.getSchema());
try {
// get offset and length for the stripes that start in the split
Pair<Long, Long> offsetAndLength =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.paimon.format.orc.reader;
package org.apache.paimon.format.orc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
Expand All @@ -29,12 +30,17 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import com.google.common.base.Objects;
import org.apache.orc.TypeDescription;

import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util for orc types. */
public class OrcSplitReaderUtil {
public class OrcTypeUtil {

public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id";
public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.id";

public static TypeDescription convertToOrcSchema(RowType rowType) {
TypeDescription struct = TypeDescription.createStruct();
Expand All @@ -45,7 +51,8 @@ public static TypeDescription convertToOrcSchema(RowType rowType) {
return struct;
}

public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
@VisibleForTesting
static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
Expand Down Expand Up @@ -142,4 +149,59 @@ public static TypeDescription convertToOrcType(DataType type, int fieldId, int d
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}

public static void checkStructCompatible(
TypeDescription requiredStruct, TypeDescription orcStruct) {
List<String> requiredFields = requiredStruct.getFieldNames();
List<TypeDescription> requiredTypes = requiredStruct.getChildren();
List<String> orcFields = orcStruct.getFieldNames();
List<TypeDescription> orcTypes = orcStruct.getChildren();

for (int i = 0; i < requiredFields.size(); i++) {
String field = requiredFields.get(i);
int orcIndex = orcFields.indexOf(field);
checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field);
TypeDescription requiredType = requiredTypes.get(i);
TypeDescription orcType = orcTypes.get(orcIndex);
checkField(field, requiredType, orcType);
}
}

private static void checkField(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
checkFieldIdAttribute(fieldName, requiredType, orcType);
if (requiredType.getCategory().isPrimitive()) {
return;
}

// see TypeDescription#getPartialName
switch (requiredType.getCategory()) {
case LIST:
checkField(
"_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0));
return;
case MAP:
checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0));
checkField(
"_value", requiredType.getChildren().get(1), orcType.getChildren().get(1));
return;
case STRUCT:
checkStructCompatible(requiredType, orcType);
return;
default:
throw new UnsupportedOperationException("Unsupported orc type: " + requiredType);
}
}

private static void checkFieldIdAttribute(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
checkArgument(
Objects.equal(requiredId, orcId),
"Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.",
fieldName,
requiredId,
orcId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

package org.apache.paimon.format.orc;

import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.orc.TypeDescription;
import org.junit.jupiter.api.Test;

import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;

/** Test for {@link OrcSplitReaderUtil}. */
class OrcSplitReaderUtilTest {
/** Test for {@link OrcTypeUtil}. */
class OrcTypeUtilTest {

@Test
void testDataTypeToOrcType() {
Expand Down Expand Up @@ -65,4 +71,26 @@ void testDataTypeToOrcType() {
private void test(String expected, DataType type) {
assertThat(convertToOrcType(type, -1, -1)).hasToString(expected);
}

@Test
void testCheckFieldIdAttribute() {
RowType full =
RowType.builder()
.field("a", DataTypes.INT())
.field(
"b",
RowType.builder(true, new AtomicInteger(5))
.field("f0", DataTypes.STRING())
.field("f1", DataTypes.INT())
.build())
.field("c", DataTypes.ARRAY(DataTypes.INT()))
.field("d", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
.build();
RowType projected = full.project("c", "b", "d");

TypeDescription required = convertToOrcSchema(projected);
TypeDescription orc = convertToOrcSchema(full);

assertThatNoException().isThrownBy(() -> checkStructCompatible(required, orc));
}
}

0 comments on commit c33df0d

Please sign in to comment.