Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 11, 2025
1 parent a288809 commit 247ad15
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public FormatWriterFactory createWriterFactory(RowType type) {

@Override
public void validateDataFields(RowType rowType) {
ParquetSchemaConverter.convertToParquetMessageType("paimon_schema", rowType);
ParquetSchemaConverter.convertToParquetMessageType(rowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;

import org.apache.parquet.ParquetReadOptions;
Expand Down Expand Up @@ -70,11 +71,10 @@
import java.util.List;
import java.util.Set;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector;
Expand All @@ -93,7 +93,6 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private final Options conf;

private final RowType projectedType;
private final String[] projectedColumnNames;
private final DataField[] projectedFields;
private final int batchSize;
private final FilterCompat.Filter filter;
Expand All @@ -103,7 +102,6 @@ public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
this.conf = conf;
this.projectedType = projectedType;
this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]);
this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
Expand Down Expand Up @@ -160,24 +158,23 @@ private void setReadOptions(ParquetReadOptions.Builder builder) {

/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[projectedColumnNames.length];
for (int i = 0; i < projectedColumnNames.length; ++i) {
String fieldName = projectedColumnNames[i];
Type[] types = new Type[projectedFields.length];
for (int i = 0; i < projectedFields.length; ++i) {
String fieldName = projectedFields[i].name();
if (!parquetSchema.containsField(fieldName)) {
LOG.warn(
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
types[i] =
ParquetSchemaConverter.convertToParquetType(fieldName, projectedFields[i]);
types[i] = ParquetSchemaConverter.convertToParquetType(projectedFields[i]);
unknownFieldsIndices.add(i);
} else {
Type parquetType = parquetSchema.getType(fieldName);
types[i] = clipParquetType(projectedFields[i].type(), parquetType);
}
}

return Types.buildMessage().addFields(types).named("paimon-parquet");
return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA);
}

/** Clips `parquetType` by `readType`. */
Expand All @@ -201,33 +198,29 @@ private Type clipParquetType(DataType readType, Type parquetType) {
case MAP:
MapType mapType = (MapType) readType;
GroupType mapGroup = (GroupType) parquetType;
GroupType keyValue = mapGroup.getType(MAP_REPEATED_NAME).asGroupType();
Pair<Type, Type> keyValueType = parquetMapKeyValueType(mapGroup);
return ConversionPatterns.mapType(
mapGroup.getRepetition(),
mapGroup.getName(),
MAP_REPEATED_NAME,
clipParquetType(mapType.getKeyType(), keyValue.getType(MAP_KEY_NAME)),
keyValue.containsField(MAP_VALUE_NAME)
? clipParquetType(
mapType.getValueType(), keyValue.getType(MAP_VALUE_NAME))
: null);
clipParquetType(mapType.getKeyType(), keyValueType.getLeft()),
clipParquetType(mapType.getValueType(), keyValueType.getRight()));
case ARRAY:
ArrayType arrayType = (ArrayType) readType;
GroupType arrayGroup = (GroupType) parquetType;
GroupType list = arrayGroup.getType(LIST_NAME).asGroupType();
return ConversionPatterns.listOfElements(
arrayGroup.getRepetition(),
arrayGroup.getName(),
clipParquetType(
arrayType.getElementType(), list.getType(LIST_ELEMENT_NAME)));
arrayType.getElementType(), parquetListElementType(arrayGroup)));
default:
return parquetType;
}
}

private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
throws IOException, UnsupportedOperationException {
if (projectedColumnNames.length != requestedSchema.getFieldCount()) {
if (projectedFields.length != requestedSchema.getFieldCount()) {
throw new RuntimeException(
"The quality of field type is incompatible with the request schema!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Pair;

import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
Expand All @@ -39,33 +41,40 @@
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

/** Schema converter converts Parquet schema to and from Paimon internal types. */
public class ParquetSchemaConverter {

static final String PAIMON_SCHEMA = "paimon_schema";

static final String MAP_REPEATED_NAME = "key_value";
static final String MAP_KEY_NAME = "key";
static final String MAP_VALUE_NAME = "value";
static final String LIST_NAME = "list";
static final String LIST_REPEATED_NAME = "list";
static final String LIST_ELEMENT_NAME = "element";

public static MessageType convertToParquetMessageType(String name, RowType rowType) {
return new MessageType(name, convertToParquetTypes(rowType));
}

public static Type convertToParquetType(String name, DataField field) {
return convertToParquetType(name, field.type(), field.id(), 0);
/** Convert paimon {@link RowType} to parquet {@link MessageType}. */
public static MessageType convertToParquetMessageType(RowType rowType) {
return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType));
}

private static Type[] convertToParquetTypes(RowType rowType) {
return rowType.getFields().stream()
.map(f -> convertToParquetType(f.name(), f.type(), f.id(), 0))
.map(ParquetSchemaConverter::convertToParquetType)
.toArray(Type[]::new);
}

/** Convert paimon {@link DataField} to parquet {@link Type}. */
public static Type convertToParquetType(DataField field) {
return convertToParquetType(field.name(), field.type(), field.id(), 0);
}

private static Type convertToParquetType(String name, DataType type, int fieldId, int depth) {
Type.Repetition repetition =
type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
Expand Down Expand Up @@ -260,4 +269,144 @@ public static boolean is32BitDecimal(int precision) {
public static boolean is64BitDecimal(int precision) {
return precision <= 18 && precision > 9;
}

/** Convert parquet {@link MessageType} to paimon {@link RowType}. */
public static RowType convertToPaimonRowType(MessageType messageType) {
List<DataField> dataFields =
messageType.asGroupType().getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList());
return new RowType(dataFields);
}

/** Convert parquet {@link Type} to paimon {@link DataField} to. */
public static DataField convertToPaimonField(Type parquetType) {
LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation();
DataType paimonDataType;

if (parquetType.isPrimitive()) {
switch (parquetType.asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
paimonDataType = DataTypes.STRING();
} else {
paimonDataType = DataTypes.BYTES();
}
break;
case BOOLEAN:
paimonDataType = DataTypes.BOOLEAN();
break;
case FLOAT:
paimonDataType = DataTypes.FLOAT();
break;
case DOUBLE:
paimonDataType = DataTypes.DOUBLE();
break;
case INT32:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
int bitWidth = intType.getBitWidth();
if (bitWidth == 8) {
paimonDataType = DataTypes.TINYINT();
} else if (bitWidth == 16) {
paimonDataType = DataTypes.SMALLINT();
} else {
paimonDataType = DataTypes.INT();
}
} else if (logicalType
instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
paimonDataType = DataTypes.DATE();
} else if (logicalType
instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
paimonDataType = DataTypes.TIME();
} else {
paimonDataType = DataTypes.INT();
}
break;
case INT64:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
int precision =
timestampType
.getUnit()
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
? 3
: 6;
paimonDataType =
timestampType.isAdjustedToUTC()
? new LocalZonedTimestampType(precision)
: new TimestampType(precision);
} else {
paimonDataType = DataTypes.BIGINT();
}
break;
case INT96:
paimonDataType = new TimestampType(9);
break;
case FIXED_LEN_BYTE_ARRAY:
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
break;
default:
throw new UnsupportedOperationException("Unsupported type: " + parquetType);
}
if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}
return new DataField(
parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
} else {
GroupType groupType = parquetType.asGroupType();
if (logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
paimonDataType =
new ArrayType(
convertToPaimonField(parquetListElementType(groupType)).type());
} else if (logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
Pair<Type, Type> keyValueType = parquetMapKeyValueType(groupType);
paimonDataType =
new MapType(
// Since parquet does not support nullable key, when converting
// back to Paimon, set as nullable by default.
convertToPaimonField(keyValueType.getLeft()).type().nullable(),
convertToPaimonField(keyValueType.getRight()).type());
} else {
paimonDataType =
new RowType(
groupType.getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList()));
}
}

if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}

return new DataField(parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
}

public static Type parquetListElementType(GroupType listType) {
return listType.getType(LIST_REPEATED_NAME).asGroupType().getType(LIST_ELEMENT_NAME);
}

public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) {
GroupType keyValue = mapType.getType(MAP_REPEATED_NAME).asGroupType();
return Pair.of(keyValue.getType(MAP_KEY_NAME), keyValue.getType(MAP_VALUE_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected WriteSupport<InternalRow> getWriteSupport(Configuration conf) {

private class ParquetWriteSupport extends WriteSupport<InternalRow> {

private final MessageType schema = convertToParquetMessageType("paimon_schema", rowType);
private final MessageType schema = convertToParquetMessageType(rowType);

private ParquetRowDataWriter writer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ public void testConvertToParquetTypeWithId() {
.withId(baseId + depthLimit * 2 + 1);
Type expected =
new MessageType(
"table",
ParquetSchemaConverter.PAIMON_SCHEMA,
Types.primitive(INT32, Type.Repetition.OPTIONAL).named("a").withId(0),
ConversionPatterns.listOfElements(
Type.Repetition.OPTIONAL,
Expand All @@ -555,7 +555,7 @@ public void testConvertToParquetTypeWithId() {
.withId(baseId - depthLimit * 2 - 1),
outerMapValueType)
.withId(2));
Type actual = ParquetSchemaConverter.convertToParquetMessageType("table", rowType);
Type actual = ParquetSchemaConverter.convertToParquetMessageType(rowType);
assertThat(actual).isEqualTo(expected);
}

Expand Down Expand Up @@ -906,8 +906,7 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou
Configuration conf = new Configuration();
conf.setInt("parquet.block.size", rowGroupSize);
MessageType schema =
ParquetSchemaConverter.convertToParquetMessageType(
"paimon-parquet", NESTED_ARRAY_MAP_TYPE);
ParquetSchemaConverter.convertToParquetMessageType(NESTED_ARRAY_MAP_TYPE);
try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(
HadoopOutputFile.fromPath(
Expand Down
Loading

0 comments on commit 247ad15

Please sign in to comment.