Skip to content

Commit

Permalink
[parquet] Fix nested array/map has no id in parquet files (#4513)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Nov 13, 2024
1 parent bbba017 commit e0ae6c8
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,18 @@
* directly by id. These ids are not stored in {@link org.apache.paimon.types.DataField}.
*
* <ul>
* <li>Array element field: ID = 536870911 + <code>(array-field-id)</code>.
* <li>Map key field: ID = 536870911 + <code>(array-field-id)</code>.
* <li>Map value field: ID = 536870911 - <code>(array-field-id)</code>.
* <li>Array element field: ID = 536870911 + 1024 * <code>(array-field-id)</code> + depth.
* <li>Map key field: ID = 536870911 - 1024 * <code>(array-field-id)</code> - depth.
* <li>Map value field: ID = 536870911 + 1024 * <code>(array-field-id)</code> + depth.
* </ul>
*
* <p>Examples:
*
* <ul>
* <li>ARRAY(MAP(INT, ARRAY(INT))) type, outer array has field id 10, then map (element of outer
* array) has field id 536870911 + 1024 * 10 + 1, map key (int) has field id 536870911 - 1024
* * 10 - 2, map value (inner array) has field id 536870911 + 1024 * 10 + 2, inner array
* element (int) has field id 536870911 + 1024 * 10 + 3
* </ul>
*/
public class SpecialFields {
Expand Down Expand Up @@ -95,16 +104,23 @@ public static boolean isSystemField(String field) {
// ----------------------------------------------------------------------------------------

public static final int STRUCTURED_TYPE_FIELD_ID_BASE = Integer.MAX_VALUE / 4;
public static final int STRUCTURED_TYPE_FIELD_DEPTH_LIMIT = 1 << 10;

public static int getArrayElementFieldId(int arrayFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE + arrayFieldId;
public static int getArrayElementFieldId(int arrayFieldId, int depth) {
return STRUCTURED_TYPE_FIELD_ID_BASE
+ arrayFieldId * STRUCTURED_TYPE_FIELD_DEPTH_LIMIT
+ depth;
}

public static int getMapKeyFieldId(int mapFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE + mapFieldId;
public static int getMapKeyFieldId(int mapFieldId, int depth) {
return STRUCTURED_TYPE_FIELD_ID_BASE
- mapFieldId * STRUCTURED_TYPE_FIELD_DEPTH_LIMIT
- depth;
}

public static int getMapValueFieldId(int mapFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE - mapFieldId;
public static int getMapValueFieldId(int mapFieldId, int depth) {
return STRUCTURED_TYPE_FIELD_ID_BASE
+ mapFieldId * STRUCTURED_TYPE_FIELD_DEPTH_LIMIT
+ depth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private final Options conf;

private final RowType projectedType;
private final String[] projectedFields;
private final DataType[] projectedTypes;
private final String[] projectedColumnNames;
private final DataField[] projectedFields;
private final int batchSize;
private final FilterCompat.Filter filter;
private final Set<Integer> unknownFieldsIndices = new HashSet<>();
Expand All @@ -97,14 +97,15 @@ public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
this.conf = conf;
this.projectedType = projectedType;
this.projectedFields = projectedType.getFieldNames().toArray(new String[0]);
this.projectedTypes = projectedType.getFieldTypes().toArray(new DataType[0]);
this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]);
this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
}

@Override
public ParquetReader createReader(FormatReaderFactory.Context context) throws IOException {
public RecordReader<InternalRow> createReader(FormatReaderFactory.Context context)
throws IOException {
ParquetReadOptions.Builder builder =
ParquetReadOptions.builder().withRange(0, context.fileSize());
setReadOptions(builder);
Expand Down Expand Up @@ -153,20 +154,20 @@ private void setReadOptions(ParquetReadOptions.Builder builder) {

/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[projectedFields.length];
for (int i = 0; i < projectedFields.length; ++i) {
String fieldName = projectedFields[i];
Type[] types = new Type[projectedColumnNames.length];
for (int i = 0; i < projectedColumnNames.length; ++i) {
String fieldName = projectedColumnNames[i];
if (!parquetSchema.containsField(fieldName)) {
LOG.warn(
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
types[i] =
ParquetSchemaConverter.convertToParquetType(fieldName, projectedTypes[i]);
ParquetSchemaConverter.convertToParquetType(fieldName, projectedFields[i]);
unknownFieldsIndices.add(i);
} else {
Type parquetType = parquetSchema.getType(fieldName);
types[i] = clipParquetType(projectedTypes[i], parquetType);
types[i] = clipParquetType(projectedFields[i].type(), parquetType);
}
}

Expand Down Expand Up @@ -220,7 +221,7 @@ private Type clipParquetType(DataType readType, Type parquetType) {

private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
throws IOException, UnsupportedOperationException {
if (projectedFields.length != requestedSchema.getFieldCount()) {
if (projectedColumnNames.length != requestedSchema.getFieldCount()) {
throw new RuntimeException(
"The quality of field type is incompatible with the request schema!");
}
Expand Down Expand Up @@ -268,13 +269,13 @@ private ParquetReaderBatch createReaderBatch(
}

private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
WritableColumnVector[] columns = new WritableColumnVector[projectedTypes.length];
WritableColumnVector[] columns = new WritableColumnVector[projectedFields.length];
List<Type> types = requestedSchema.getFields();
for (int i = 0; i < projectedTypes.length; i++) {
for (int i = 0; i < projectedFields.length; i++) {
columns[i] =
createWritableColumnVector(
batchSize,
projectedTypes[i],
projectedFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
0);
Expand All @@ -290,7 +291,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
WritableColumnVector[] writableVectors) {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedTypes[i].getTypeRoot()) {
switch (projectedFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] = new ParquetDecimalVector(writableVectors[i]);
break;
Expand Down Expand Up @@ -416,7 +417,7 @@ private void readNextRowGroup() throws IOException {
if (!unknownFieldsIndices.contains(i)) {
columnReaders[i] =
createColumnReader(
projectedTypes[i],
projectedFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
rowGroup,
Expand Down
Loading

0 comments on commit e0ae6c8

Please sign in to comment.