Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[orc] Add type id to orc files #4523

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.format.orc.filter.OrcPredicateFunctionVisitor;
import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor;
import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil;
import org.apache.paimon.format.orc.writer.RowDataVectorizer;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -123,7 +122,7 @@ public FormatReaderFactory createReaderFactory(
@Override
public void validateDataFields(RowType rowType) {
DataType refinedType = refineDataType(rowType);
OrcSplitReaderUtil.toOrcType(refinedType);
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
}

/**
Expand All @@ -141,9 +140,8 @@ public FormatWriterFactory createWriterFactory(RowType type) {
DataType refinedType = refineDataType(type);
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);

TypeDescription typeDescription = OrcSplitReaderUtil.toOrcType(refinedType);
Vectorizer<InternalRow> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
Vectorizer<InternalRow> vectorizer = new RowDataVectorizer(typeDescription, orcTypes);

return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
Expand All @@ -81,7 +81,7 @@ public OrcReaderFactory(
final int batchSize,
final boolean deletionVectorsEnabled) {
this.hadoopConfig = checkNotNull(hadoopConfig);
this.schema = toOrcType(readType);
this.schema = convertToOrcSchema(readType);
this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.orc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import org.apache.orc.TypeDescription;

/** Util for orc types. */
public class OrcTypeUtil {

public static final String PAIMON_ORC_FIELD_ID_KEY = "paimon.id";

public static TypeDescription convertToOrcSchema(RowType rowType) {
TypeDescription struct = TypeDescription.createStruct();
for (DataField dataField : rowType.getFields()) {
TypeDescription child = convertToOrcType(dataField.type(), dataField.id(), 0);
struct.addField(dataField.name(), child);
}
return struct;
}

@VisibleForTesting
static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
return TypeDescription.createChar()
.withMaxLength(((CharType) type).getLength())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARCHAR:
int len = ((VarCharType) type).getLength();
if (len == VarCharType.MAX_LENGTH) {
return TypeDescription.createString()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
return TypeDescription.createVarchar()
.withMaxLength(len)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
}
case BOOLEAN:
return TypeDescription.createBoolean()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARBINARY:
if (type.equals(DataTypes.BYTES())) {
return TypeDescription.createBinary()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
} else {
throw new UnsupportedOperationException(
"Not support other binary type: " + type);
}
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
return TypeDescription.createDecimal()
.withScale(decimalType.getScale())
.withPrecision(decimalType.getPrecision())
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TINYINT:
return TypeDescription.createByte()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case SMALLINT:
return TypeDescription.createShort()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
return TypeDescription.createInt()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case BIGINT:
return TypeDescription.createLong()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case FLOAT:
return TypeDescription.createFloat()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DOUBLE:
return TypeDescription.createDouble()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case DATE:
return TypeDescription.createDate()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TypeDescription.createTimestamp()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TypeDescription.createTimestampInstant()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ARRAY:
ArrayType arrayType = (ArrayType) type;

String elementFieldId =
String.valueOf(SpecialFields.getArrayElementFieldId(fieldId, depth + 1));
TypeDescription elementOrcType =
convertToOrcType(arrayType.getElementType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, elementFieldId);

return TypeDescription.createList(elementOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case MAP:
MapType mapType = (MapType) type;

String mapKeyFieldId =
String.valueOf(SpecialFields.getMapKeyFieldId(fieldId, depth + 1));
TypeDescription mapKeyOrcType =
convertToOrcType(mapType.getKeyType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapKeyFieldId);

String mapValueFieldId =
String.valueOf(SpecialFields.getMapValueFieldId(fieldId, depth + 1));
TypeDescription mapValueOrcType =
convertToOrcType(mapType.getValueType(), fieldId, depth + 1)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, mapValueFieldId);

return TypeDescription.createMap(mapKeyOrcType, mapValueOrcType)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case ROW:
return convertToOrcSchema((RowType) type)
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;

import java.util.Arrays;
import java.util.List;
Expand All @@ -35,7 +36,7 @@ public class RowDataVectorizer extends Vectorizer<InternalRow> {

private final List<FieldWriter> fieldWriters;

public RowDataVectorizer(String schema, DataType[] fieldTypes) {
public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) {
super(schema);
this.fieldWriters =
Arrays.stream(fieldTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public abstract class Vectorizer<T> implements Serializable {

private final TypeDescription schema;

public Vectorizer(final String schema) {
public Vectorizer(final TypeDescription schema) {
checkNotNull(schema);
this.schema = TypeDescription.fromString(schema);
this.schema = schema;
}

/**
Expand Down

This file was deleted.

Loading
Loading