Skip to content

Commit

Permalink
[parquet] Add type id to orc files
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Nov 13, 2024
1 parent 19119e3 commit 8f04099
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public FormatReaderFactory createReaderFactory(
@Override
public void validateDataFields(RowType rowType) {
DataType refinedType = refineDataType(rowType);
OrcSplitReaderUtil.toOrcType(refinedType);
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
}

/**
Expand All @@ -141,7 +141,8 @@ public FormatWriterFactory createWriterFactory(RowType type) {
DataType refinedType = refineDataType(type);
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);

TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType);
TypeDescription typeDescription =
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
Vectorizer<InternalRow> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.List;

import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
Expand All @@ -81,7 +81,7 @@ public OrcReaderFactory(
final int batchSize,
final boolean deletionVectorsEnabled) {
this.hadoopConfig = checkNotNull(hadoopConfig);
this.schema = toOrcType(readType);
this.schema = convertToOrcSchema(readType);
this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.paimon.format.orc.reader;

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
Expand All @@ -32,23 +34,41 @@
/** Util for orc types. */
public class OrcSplitReaderUtil {

public static TypeDescription toOrcType(DataType type) {
public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id";

public static TypeDescription convertToOrcSchema(RowType rowType) {
TypeDescription struct = TypeDescription.createStruct();
for (DataField dataField : rowType.getFields()) {
TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0);
struct.addField(dataField.name(), child);
}
return struct;
}

public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
return TypeDescription.createChar().withMaxLength(((CharType) type).getLength());
return TypeDescription.createChar()
.withMaxLength(((CharType) type).getLength())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARCHAR:
int len = ((VarCharType) type).getLength();
if (len == VarCharType.MAX_LENGTH) {
return TypeDescription.createString();
return TypeDescription.createString()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
return TypeDescription.createVarchar().withMaxLength(len);
return TypeDescription.createVarchar()
.withMaxLength(len)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
}
case BOOLEAN:
return TypeDescription.createBoolean();
return TypeDescription.createBoolean()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARBINARY:
if (type.equals(DataTypes.BYTES())) {
return TypeDescription.createBinary();
return TypeDescription.createBinary()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
throw new UnsupportedOperationException(
"Not support other binary type: " + type);
Expand All @@ -57,41 +77,70 @@ public static TypeDescription toOrcType(DataType type) {
DecimalType decimalType = (DecimalType) type;
return TypeDescription.createDecimal()
.withScale(decimalType.getScale())
.withPrecision(decimalType.getPrecision());
.withPrecision(decimalType.getPrecision())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TINYINT:
return TypeDescription.createByte();
return TypeDescription.createByte()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case SMALLINT:
return TypeDescription.createShort();
return TypeDescription.createShort()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
return TypeDescription.createInt();
return TypeDescription.createInt()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case BIGINT:
return TypeDescription.createLong();
return TypeDescription.createLong()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case FLOAT:
return TypeDescription.createFloat();
return TypeDescription.createFloat()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DOUBLE:
return TypeDescription.createDouble();
return TypeDescription.createDouble()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DATE:
return TypeDescription.createDate();
return TypeDescription.createDate()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TypeDescription.createTimestamp();
return TypeDescription.createTimestamp()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TypeDescription.createTimestampInstant();
return TypeDescription.createTimestampInstant()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ARRAY:
ArrayType arrayType = (ArrayType) type;
return TypeDescription.createList(toOrcType(arrayType.getElementType()));
return TypeDescription.createList(
convertToOrcType(
arrayType.getElementType(),
SpecialFields.getArrayElementFieldId(fieldId, depth + 1),
depth + 1))
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));

case MAP:
MapType mapType = (MapType) type;
return TypeDescription.createMap(
toOrcType(mapType.getKeyType()), toOrcType(mapType.getValueType()));
TypeDescription mapKeyOrcType =
convertToOrcType(
mapType.getKeyType(),
SpecialFields.getMapKeyFieldId(fieldId, depth + 1),
depth + 1);
TypeDescription mapValueOrcType =
convertToOrcType(
mapType.getValueType(),
SpecialFields.getMapValueFieldId(fieldId, depth + 1),
depth + 1);
return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ROW:
RowType rowType = (RowType) type;
TypeDescription struct = TypeDescription.createStruct();
for (int i = 0; i < rowType.getFieldCount(); i++) {
struct.addField(
rowType.getFieldNames().get(i), toOrcType(rowType.getTypeAt(i)));
TypeDescription struct =
TypeDescription.createStruct()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
for (DataField dataField : rowType.getFields()) {
TypeDescription child =
convertToOrcType(dataField.type(), dataField.id(), depth + 1);
struct.addField(dataField.name(), child);
}
return struct;
return struct.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.junit.jupiter.api.Test;

import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link OrcSplitReaderUtil}. */
Expand Down Expand Up @@ -63,6 +63,6 @@ void testDataTypeToOrcType() {
}

private void test(String expected, DataType type) {
assertThat(toOrcType(type)).hasToString(expected);
assertThat(convertToOrcType(type, -1, -1)).hasToString(expected);
}
}

0 comments on commit 8f04099

Please sign in to comment.