Skip to content

Commit

Permalink
[orc] Add type id to orc files
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Nov 19, 2024
1 parent 848cb59 commit f206d40
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public FormatReaderFactory createReaderFactory(
@Override
public void validateDataFields(RowType rowType) {
DataType refinedType = refineDataType(rowType);
OrcSplitReaderUtil.toOrcType(refinedType);
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
}

/**
Expand All @@ -141,9 +141,9 @@ public FormatWriterFactory createWriterFactory(RowType type) {
DataType refinedType = refineDataType(type);
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);

TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType);
Vectorizer<InternalRow> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
TypeDescription typeDescription =
OrcSplitReaderUtil.convertToOrcSchema((RowType) refinedType);
Vectorizer<InternalRow> vectorizer = new RowDataVectorizer(typeDescription, orcTypes);

return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.List;

import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcSchema;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
Expand All @@ -81,7 +81,7 @@ public OrcReaderFactory(
final int batchSize,
final boolean deletionVectorsEnabled) {
this.hadoopConfig = checkNotNull(hadoopConfig);
this.schema = toOrcType(readType);
this.schema = convertToOrcSchema(readType);
this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.paimon.format.orc.reader;

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
Expand All @@ -32,23 +34,41 @@
/** Util for orc types. */
public class OrcSplitReaderUtil {

public static TypeDescription toOrcType(DataType type) {
public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.field.id";

public static TypeDescription convertToOrcSchema(RowType rowType) {
TypeDescription struct = TypeDescription.createStruct();
for (DataField dataField : rowType.getFields()) {
TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0);
struct.addField(dataField.name(), child);
}
return struct;
}

public static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
return TypeDescription.createChar().withMaxLength(((CharType) type).getLength());
return TypeDescription.createChar()
.withMaxLength(((CharType) type).getLength())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARCHAR:
int len = ((VarCharType) type).getLength();
if (len == VarCharType.MAX_LENGTH) {
return TypeDescription.createString();
return TypeDescription.createString()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
return TypeDescription.createVarchar().withMaxLength(len);
return TypeDescription.createVarchar()
.withMaxLength(len)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
}
case BOOLEAN:
return TypeDescription.createBoolean();
return TypeDescription.createBoolean()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARBINARY:
if (type.equals(DataTypes.BYTES())) {
return TypeDescription.createBinary();
return TypeDescription.createBinary()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
throw new UnsupportedOperationException(
"Not support other binary type: " + type);
Expand All @@ -57,41 +77,67 @@ public static TypeDescription toOrcType(DataType type) {
DecimalType decimalType = (DecimalType) type;
return TypeDescription.createDecimal()
.withScale(decimalType.getScale())
.withPrecision(decimalType.getPrecision());
.withPrecision(decimalType.getPrecision())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TINYINT:
return TypeDescription.createByte();
return TypeDescription.createByte()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case SMALLINT:
return TypeDescription.createShort();
return TypeDescription.createShort()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
return TypeDescription.createInt();
return TypeDescription.createInt()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case BIGINT:
return TypeDescription.createLong();
return TypeDescription.createLong()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case FLOAT:
return TypeDescription.createFloat();
return TypeDescription.createFloat()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DOUBLE:
return TypeDescription.createDouble();
return TypeDescription.createDouble()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DATE:
return TypeDescription.createDate();
return TypeDescription.createDate()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TypeDescription.createTimestamp();
return TypeDescription.createTimestamp()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TypeDescription.createTimestampInstant();
return TypeDescription.createTimestampInstant()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ARRAY:
ArrayType arrayType = (ArrayType) type;
return TypeDescription.createList(toOrcType(arrayType.getElementType()));

String elementFieldId =
String.valueOf(SpecialFields.getArrayElementFieldId(fieldId, depth + 1));
TypeDescription elementOrcType =
convertToOrcType(arrayType.getElementType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, elementFieldId);

return TypeDescription.createList(elementOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case MAP:
MapType mapType = (MapType) type;
return TypeDescription.createMap(
toOrcType(mapType.getKeyType()), toOrcType(mapType.getValueType()));

String mapKeyFieldId =
String.valueOf(SpecialFields.getMapKeyFieldId(fieldId, depth + 1));
TypeDescription mapKeyOrcType =
convertToOrcType(mapType.getKeyType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapKeyFieldId);

String mapValueFieldId =
String.valueOf(SpecialFields.getMapValueFieldId(fieldId, depth + 1));
TypeDescription mapValueOrcType =
convertToOrcType(mapType.getValueType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapValueFieldId);

return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ROW:
RowType rowType = (RowType) type;
TypeDescription struct = TypeDescription.createStruct();
for (int i = 0; i < rowType.getFieldCount(); i++) {
struct.addField(
rowType.getFieldNames().get(i), toOrcType(rowType.getTypeAt(i)));
}
return struct;
return convertToOrcSchema((RowType) type)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;

import java.util.Arrays;
import java.util.List;
Expand All @@ -35,7 +36,7 @@ public class RowDataVectorizer extends Vectorizer<InternalRow> {

private final List<FieldWriter> fieldWriters;

public RowDataVectorizer(String schema, DataType[] fieldTypes) {
public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) {
super(schema);
this.fieldWriters =
Arrays.stream(fieldTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public abstract class Vectorizer<T> implements Serializable {

private final TypeDescription schema;

public Vectorizer(final String schema) {
public Vectorizer(final TypeDescription schema) {
checkNotNull(schema);
this.schema = TypeDescription.fromString(schema);
this.schema = schema;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.junit.jupiter.api.Test;

import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.convertToOrcType;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link OrcSplitReaderUtil}. */
Expand Down Expand Up @@ -63,6 +63,6 @@ void testDataTypeToOrcType() {
}

private void test(String expected, DataType type) {
assertThat(toOrcType(type)).hasToString(expected);
assertThat(convertToOrcType(type, -1, -1)).hasToString(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.orc.MemoryManager;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -47,7 +48,7 @@ void testNotOverrideInMemoryManager(@TempDir java.nio.file.Path tmpDir) throws I
OrcWriterFactory factory =
new TestOrcWriterFactory(
new RowDataVectorizer(
"struct<_col0:string,_col1:int>",
TypeDescription.fromString("struct<_col0:string,_col1:int>"),
new DataType[] {DataTypes.STRING(), DataTypes.INT()}),
memoryManager);
factory.create(new LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");
Expand Down

0 comments on commit f206d40

Please sign in to comment.