Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Aug 5, 2024
1 parent aa32102 commit 4a44158
Show file tree
Hide file tree
Showing 4 changed files with 882 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.position.CollectionPosition;
import org.apache.paimon.format.parquet.position.LevelDelegation;
import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
Expand All @@ -41,6 +40,7 @@
import org.apache.parquet.column.page.PageReadStore;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,71 +86,87 @@ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, ParquetFi

@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
readData(field, readNumber, vector, false, false);
readData(field, readNumber, vector, false, false, false);
}

private Pair<LevelDelegation, WritableColumnVector> readData(
ParquetField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean parentIsRowType)
boolean readRowField,
boolean readMapKey)
throws IOException {
if (field.getType() instanceof RowType) {
return readRow((ParquetGroupField) field, readNumber, vector, inside);
} else if (field.getType() instanceof MapType || field.getType() instanceof MultisetType) {
return readMap((ParquetGroupField) field, readNumber, vector, inside);
return readMap((ParquetGroupField) field, readNumber, vector, inside, readRowField);
} else if (field.getType() instanceof ArrayType) {
return readArray((ParquetGroupField) field, readNumber, vector, inside);
return readArray((ParquetGroupField) field, readNumber, vector, inside, readRowField);
} else {
return readPrimitive(
(ParquetPrimitiveField) field, readNumber, vector, parentIsRowType);
(ParquetPrimitiveField) field, readNumber, vector, readRowField, readMapKey);
}
}

private Pair<LevelDelegation, WritableColumnVector> readRow(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
throws IOException {
HeapRowVector heapRowVector = (HeapRowVector) vector;
LevelDelegation levelDelegation = null;
LevelDelegation longest = null;
List<ParquetField> children = field.getChildren();
WritableColumnVector[] childrenVectors = heapRowVector.getFields();
WritableColumnVector[] finalChildrenVectors =
new WritableColumnVector[childrenVectors.length];
for (int i = 0; i < children.size(); i++) {
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(i), readNumber, childrenVectors[i], true, true);
levelDelegation = tuple.getLeft();
readData(children.get(i), readNumber, childrenVectors[i], true, true, false);
LevelDelegation current = tuple.getLeft();
if (longest == null) {
longest = current;
} else if (current.getDefinitionLevel().length > longest.getDefinitionLevel().length) {
longest = current;
}
finalChildrenVectors[i] = tuple.getRight();
}
if (levelDelegation == null) {
if (longest == null) {
throw new RuntimeException(
String.format("Row field does not have any children: %s.", field));
}

RowPosition rowPosition =
NestedPositionUtil.calculateRowOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
boolean[] isNull = new boolean[len];
Arrays.fill(isNull, true);
boolean hasNull = false;
for (int i = 0; i < len; i++) {
for (WritableColumnVector child : finalChildrenVectors) {
isNull[i] = isNull[i] && child.isNullAt(i);
}
if (isNull[i]) {
hasNull = true;
}
}

// If row was inside the structure, then we need to renew the vector to reset the
// capacity.
if (inside) {
heapRowVector =
new HeapRowVector(rowPosition.getPositionsCount(), finalChildrenVectors);
heapRowVector = new HeapRowVector(len, finalChildrenVectors);
} else {
heapRowVector.setFields(finalChildrenVectors);
}

if (rowPosition.getIsNull() != null) {
setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
if (hasNull) {
setFieldNullFlag(isNull, heapRowVector);
}
return Pair.of(levelDelegation, heapRowVector);
return Pair.of(longest, heapRowVector);
}

private Pair<LevelDelegation, WritableColumnVector> readMap(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
ParquetGroupField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean readRowField)
throws IOException {
HeapMapVector mapVector = (HeapMapVector) vector;
mapVector.reset();
Expand All @@ -160,18 +176,30 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
"Maps must have two type parameters, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> keyTuple =
readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true, false);
readData(
children.get(0),
readNumber,
mapVector.getKeyColumnVector(),
true,
false,
true);
Pair<LevelDelegation, WritableColumnVector> valueTuple =
readData(
children.get(1), readNumber, mapVector.getValueColumnVector(), true, false);
children.get(1),
readNumber,
mapVector.getValueColumnVector(),
true,
false,
false);

LevelDelegation levelDelegation = keyTuple.getLeft();

CollectionPosition collectionPosition =
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
levelDelegation.getRepetitionLevel(),
readRowField);

// If map was inside the structure, then we need to renew the vector to reset the
// capacity.
Expand All @@ -197,7 +225,11 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
}

private Pair<LevelDelegation, WritableColumnVector> readArray(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
ParquetGroupField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean readRowField)
throws IOException {
HeapArrayVector arrayVector = (HeapArrayVector) vector;
arrayVector.reset();
Expand All @@ -207,14 +239,15 @@ private Pair<LevelDelegation, WritableColumnVector> readArray(
"Arrays must have a single type parameter, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(0), readNumber, arrayVector.getChild(), true, false);
readData(children.get(0), readNumber, arrayVector.getChild(), true, false, false);

LevelDelegation levelDelegation = tuple.getLeft();
CollectionPosition collectionPosition =
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
levelDelegation.getRepetitionLevel(),
readRowField);

// If array was inside the structure, then we need to renew the vector to reset the
// capacity.
Expand All @@ -236,7 +269,8 @@ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
ParquetPrimitiveField field,
int readNumber,
ColumnVector vector,
boolean parentIsRowType)
boolean readRowField,
boolean readMapKey)
throws IOException {
ColumnDescriptor descriptor = field.getDescriptor();
NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
Expand All @@ -248,7 +282,8 @@ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
isUtcTimestamp,
descriptor.getPrimitiveType(),
field.getType(),
parentIsRowType);
readRowField,
readMapKey);
columnReaders.put(descriptor, reader);
}
WritableColumnVector writableColumnVector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.format.parquet.reader;

import org.apache.paimon.format.parquet.position.CollectionPosition;
import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.utils.BooleanArrayList;
import org.apache.paimon.utils.LongArrayList;
Expand All @@ -29,50 +28,6 @@
/** Utils to calculate nested type position. */
public class NestedPositionUtil {

/**
* Calculate row offsets according to column's max repetition level, definition level, value's
* repetition level and definition level. Each row has three situation:
* <li>Row is not defined,because it's optional parent fields is null, this is decided by its
* parent's repetition level
* <li>Row is null
* <li>Row is defined and not empty.
*
* @param field field that contains the row column message include max repetition level and
* definition level.
* @param fieldRepetitionLevels int array with each value's repetition level.
* @param fieldDefinitionLevels int array with each value's definition level.
* @return {@link RowPosition} contains collections row count and isNull array.
*/
public static RowPosition calculateRowOffsets(
ParquetField field, int[] fieldDefinitionLevels, int[] fieldRepetitionLevels) {
int rowDefinitionLevel = field.getDefinitionLevel();
int rowRepetitionLevel = field.getRepetitionLevel();
int nullValuesCount = 0;
BooleanArrayList nullRowFlags = new BooleanArrayList(0);
for (int i = 0; i < fieldDefinitionLevels.length; i++) {
if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
throw new IllegalStateException(
format(
"In parquet's row type field repetition level should not larger than row's repetition level. "
+ "Row repetition level is %s, row field repetition level is %s.",
rowRepetitionLevel, fieldRepetitionLevels[i]));
}

if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
// current row is defined and not empty
nullRowFlags.add(false);
} else {
// current row is null
nullRowFlags.add(true);
nullValuesCount++;
}
}
if (nullValuesCount == 0) {
return new RowPosition(null, fieldDefinitionLevels.length);
}
return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
}

/**
* Calculate the collection's offsets according to column's max repetition level, definition
* level, value's repetition level and definition level. Each collection (Array or Map) has four
Expand All @@ -92,7 +47,10 @@ public static RowPosition calculateRowOffsets(
* array.
*/
public static CollectionPosition calculateCollectionOffsets(
ParquetField field, int[] definitionLevels, int[] repetitionLevels) {
ParquetField field,
int[] definitionLevels,
int[] repetitionLevels,
boolean readRowField) {
int collectionDefinitionLevel = field.getDefinitionLevel();
int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
int offset = 0;
Expand All @@ -110,7 +68,8 @@ public static CollectionPosition calculateCollectionOffsets(
// empty
// definitionLevels[i] == collectionDefinitionLevel => Collection is defined but
// empty
// definitionLevels[i] == maxDefinitionLevel - 1 => Collection is defined but null
// definitionLevels[i] == collectionDefinitionLevel - 1 => Collection is defined but
// null
if (definitionLevels[i] > collectionDefinitionLevel) {
nullCollectionFlags.add(false);
emptyCollectionFlags.add(false);
Expand All @@ -127,6 +86,14 @@ public static CollectionPosition calculateCollectionOffsets(
// must be set at each index for calculating lengths later
emptyCollectionFlags.add(false);
}
offsets.add(offset);
valueCount++;
} else if (definitionLevels[i] == collectionDefinitionLevel - 2 && readRowField) {
// row field should store null value
nullCollectionFlags.add(true);
nullValuesCount++;
emptyCollectionFlags.add(false);

offsets.add(offset);
valueCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV
private final ColumnDescriptor descriptor;
private final Type type;
private final DataType dataType;
private final boolean parentIsRowType;
private final boolean readRowField;
private final boolean readMapKey;
/** The dictionary, if this column has dictionary encoding. */
private final ParquetDataColumnReader dictionary;
/** Maximum definition level for this column. */
Expand Down Expand Up @@ -118,15 +119,17 @@ public NestedPrimitiveColumnReader(
boolean isUtcTimestamp,
Type parquetType,
DataType dataType,
boolean parentIsRowType)
boolean readRowField,
boolean readMapKey)
throws IOException {
this.descriptor = descriptor;
this.type = parquetType;
this.pageReader = pageReader;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.isUtcTimestamp = isUtcTimestamp;
this.dataType = dataType;
this.parentIsRowType = parentIsRowType;
this.readRowField = readRowField;
this.readMapKey = readMapKey;

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
Expand Down Expand Up @@ -206,11 +209,12 @@ public LevelDelegation getLevelDelegation() {
* </ul>
*
* <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector
* for OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay
* consistent here.
* for OrcArrayColumnVector don't contain empty and null set value. Stay consistent here.
*
* <p>But notice that children of RowColumnVector still get null value when entire outer row is
* null, so when {@code parentIsRowType} is true the null value is still stored.
* <p>For MAP, the value vector is the same as ARRAY. But the key vector isn't nullable, so just
* read value when definitionLevel == maxDefLevel.
*
* <p>For ROW, RowColumnVector still get null value when definitionLevel == maxDefLevel - 2.
*/
private boolean readValue() throws IOException {
int left = readPageIfNeed();
Expand All @@ -225,13 +229,19 @@ private boolean readValue() throws IOException {
} else {
lastValue.setValue(readPrimitiveTypedRow(dataType));
}
} else if (definitionLevel == maxDefLevel - 1) {
lastValue.setValue(null);
} else {
if (parentIsRowType) {
lastValue.setValue(null);
} else {
if (readMapKey) {
lastValue.skip();
} else {
if (definitionLevel == maxDefLevel - 1) {
// null value inner set
lastValue.setValue(null);
} else if (definitionLevel == maxDefLevel - 2 && readRowField) {
lastValue.setValue(null);
} else {
// current set is empty or null
lastValue.skip();
}
}
}
return true;
Expand Down
Loading

0 comments on commit 4a44158

Please sign in to comment.