Skip to content

Commit

Permalink
[orc] Add type id to orc files (#4523)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Nov 19, 2024
1 parent ebef050 commit 06fbb5e
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.format.orc.filter.OrcPredicateFunctionVisitor;
import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor;
import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil;
import org.apache.paimon.format.orc.writer.RowDataVectorizer;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -123,7 +122,7 @@ public FormatReaderFactory createReaderFactory(
@Override
public void validateDataFields(RowType rowType) {
DataType refinedType = refineDataType(rowType);
OrcSplitReaderUtil.toOrcType(refinedType);
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
}

/**
Expand All @@ -141,9 +140,8 @@ public FormatWriterFactory createWriterFactory(RowType type) {
DataType refinedType = refineDataType(type);
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);

TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType);
Vectorizer<InternalRow> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
Vectorizer<InternalRow> vectorizer = new RowDataVectorizer(typeDescription, orcTypes);

return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
Expand All @@ -81,7 +81,7 @@ public OrcReaderFactory(
final int batchSize,
final boolean deletionVectorsEnabled) {
this.hadoopConfig = checkNotNull(hadoopConfig);
this.schema = toOrcType(readType);
this.schema = convertToOrcSchema(readType);
this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.orc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import org.apache.orc.TypeDescription;

/** Util for orc types. */
public class OrcTypeUtil {

public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.id";

public static TypeDescription convertToOrcSchema(RowType rowType) {
TypeDescription struct = TypeDescription.createStruct();
for (DataField dataField : rowType.getFields()) {
TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0);
struct.addField(dataField.name(), child);
}
return struct;
}

@VisibleForTesting
static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
return TypeDescription.createChar()
.withMaxLength(((CharType) type).getLength())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARCHAR:
int len = ((VarCharType) type).getLength();
if (len == VarCharType.MAX_LENGTH) {
return TypeDescription.createString()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
return TypeDescription.createVarchar()
.withMaxLength(len)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
}
case BOOLEAN:
return TypeDescription.createBoolean()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARBINARY:
if (type.equals(DataTypes.BYTES())) {
return TypeDescription.createBinary()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
throw new UnsupportedOperationException(
"Not support other binary type: " + type);
}
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
return TypeDescription.createDecimal()
.withScale(decimalType.getScale())
.withPrecision(decimalType.getPrecision())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TINYINT:
return TypeDescription.createByte()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case SMALLINT:
return TypeDescription.createShort()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
return TypeDescription.createInt()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case BIGINT:
return TypeDescription.createLong()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case FLOAT:
return TypeDescription.createFloat()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DOUBLE:
return TypeDescription.createDouble()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DATE:
return TypeDescription.createDate()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TypeDescription.createTimestamp()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TypeDescription.createTimestampInstant()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ARRAY:
ArrayType arrayType = (ArrayType) type;

String elementFieldId =
String.valueOf(SpecialFields.getArrayElementFieldId(fieldId, depth + 1));
TypeDescription elementOrcType =
convertToOrcType(arrayType.getElementType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, elementFieldId);

return TypeDescription.createList(elementOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case MAP:
MapType mapType = (MapType) type;

String mapKeyFieldId =
String.valueOf(SpecialFields.getMapKeyFieldId(fieldId, depth + 1));
TypeDescription mapKeyOrcType =
convertToOrcType(mapType.getKeyType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapKeyFieldId);

String mapValueFieldId =
String.valueOf(SpecialFields.getMapValueFieldId(fieldId, depth + 1));
TypeDescription mapValueOrcType =
convertToOrcType(mapType.getValueType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapValueFieldId);

return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ROW:
return convertToOrcSchema((RowType) type)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;

import java.util.Arrays;
import java.util.List;
Expand All @@ -35,7 +36,7 @@ public class RowDataVectorizer extends Vectorizer<InternalRow> {

private final List<FieldWriter> fieldWriters;

public RowDataVectorizer(String schema, DataType[] fieldTypes) {
public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) {
super(schema);
this.fieldWriters =
Arrays.stream(fieldTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public abstract class Vectorizer<T> implements Serializable {

private final TypeDescription schema;

public Vectorizer(final String schema) {
public Vectorizer(final TypeDescription schema) {
checkNotNull(schema);
this.schema = TypeDescription.fromString(schema);
this.schema = schema;
}

/**
Expand Down

This file was deleted.

Loading

0 comments on commit 06fbb5e

Please sign in to comment.