Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Aug 6, 2024
1 parent 606abd7 commit 7a1bca3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, ParquetFi

@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
readData(field, readNumber, vector, false);
readData(field, readNumber, vector, false, false);
}

private Pair<LevelDelegation, WritableColumnVector> readData(
ParquetField field, int readNumber, ColumnVector vector, boolean inside)
ParquetField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean parentIsRowType)
throws IOException {
if (field.getType() instanceof RowType) {
return readRow((ParquetGroupField) field, readNumber, vector, inside);
Expand All @@ -99,7 +103,8 @@ private Pair<LevelDelegation, WritableColumnVector> readData(
} else if (field.getType() instanceof ArrayType) {
return readArray((ParquetGroupField) field, readNumber, vector, inside);
} else {
return readPrimitive((ParquetPrimitiveField) field, readNumber, vector);
return readPrimitive(
(ParquetPrimitiveField) field, readNumber, vector, parentIsRowType);
}
}

Expand All @@ -114,7 +119,7 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
new WritableColumnVector[childrenVectors.length];
for (int i = 0; i < children.size(); i++) {
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(i), readNumber, childrenVectors[i], true);
readData(children.get(i), readNumber, childrenVectors[i], true, true);
levelDelegation = tuple.getLeft();
finalChildrenVectors[i] = tuple.getRight();
}
Expand All @@ -139,7 +144,7 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
}

if (rowPosition.getIsNull() != null) {
setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
}
return Pair.of(levelDelegation, heapRowVector);
}
Expand All @@ -155,9 +160,10 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
"Maps must have two type parameters, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> keyTuple =
readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true);
readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true, false);
Pair<LevelDelegation, WritableColumnVector> valueTuple =
readData(children.get(1), readNumber, mapVector.getValueColumnVector(), true);
readData(
children.get(1), readNumber, mapVector.getValueColumnVector(), true, false);

LevelDelegation levelDelegation = keyTuple.getLeft();

Expand All @@ -181,7 +187,7 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
}

if (collectionPosition.getIsNull() != null) {
setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
}

mapVector.setLengths(collectionPosition.getLength());
Expand All @@ -201,7 +207,7 @@ private Pair<LevelDelegation, WritableColumnVector> readArray(
"Arrays must have a single type parameter, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(0), readNumber, arrayVector.getChild(), true);
readData(children.get(0), readNumber, arrayVector.getChild(), true, false);

LevelDelegation levelDelegation = tuple.getLeft();
CollectionPosition collectionPosition =
Expand All @@ -219,15 +225,19 @@ private Pair<LevelDelegation, WritableColumnVector> readArray(
}

if (collectionPosition.getIsNull() != null) {
setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
}
arrayVector.setLengths(collectionPosition.getLength());
arrayVector.setOffsets(collectionPosition.getOffsets());
return Pair.of(levelDelegation, arrayVector);
}

private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws IOException {
ParquetPrimitiveField field,
int readNumber,
ColumnVector vector,
boolean parentIsRowType)
throws IOException {
ColumnDescriptor descriptor = field.getDescriptor();
NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
if (reader == null) {
Expand All @@ -237,15 +247,16 @@ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
pages.getPageReader(descriptor),
isUtcTimestamp,
descriptor.getPrimitiveType(),
field.getType());
field.getType(),
parentIsRowType);
columnReaders.put(descriptor, reader);
}
WritableColumnVector writableColumnVector =
reader.readAndNewVector(readNumber, (WritableColumnVector) vector);
return Pair.of(reader.getLevelDelegation(), writableColumnVector);
}

private static void setFieldNullFalg(boolean[] nullFlags, AbstractHeapVector vector) {
private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector vector) {
for (int index = 0; index < vector.getLen() && index < nullFlags.length; index++) {
if (nullFlags[index]) {
vector.setNullAt(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public static CollectionPosition calculateCollectionOffsets(
i < definitionLevels.length;
i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) {
if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
valueCount++;

boolean isNull =
isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel);
if (isNull) {
Expand Down Expand Up @@ -142,7 +144,6 @@ public static CollectionPosition calculateCollectionOffsets(
collectionDefinitionLevel));
}
offsets.add(offset);
valueCount++;
}
// else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the
// collection is not defined, just ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV
private final ColumnDescriptor descriptor;
private final Type type;
private final DataType dataType;
private final boolean parentIsRowType;
/** The dictionary, if this column has dictionary encoding. */
private final ParquetDataColumnReader dictionary;
/** Maximum definition level for this column. */
private final int maxDefLevel;

private boolean isUtcTimestamp;
private final boolean isUtcTimestamp;

/** Total number of values read. */
private long valuesRead;
Expand Down Expand Up @@ -109,21 +110,23 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV

private boolean isFirstRow = true;

private LastValueContainer lastValue = new LastValueContainer();
private final LastValueContainer lastValue = new LastValueContainer();

public NestedPrimitiveColumnReader(
ColumnDescriptor descriptor,
PageReader pageReader,
boolean isUtcTimestamp,
Type parquetType,
DataType dataType)
DataType dataType,
boolean parentIsRowType)
throws IOException {
this.descriptor = descriptor;
this.type = parquetType;
this.pageReader = pageReader;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.isUtcTimestamp = isUtcTimestamp;
this.dataType = dataType;
this.parentIsRowType = parentIsRowType;

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
Expand Down Expand Up @@ -190,7 +193,7 @@ public LevelDelegation getLevelDelegation() {
}

/**
* Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0]
* An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0]
*
* <ul>
* <li>definitionLevel == maxDefLevel => not null value
Expand All @@ -205,6 +208,9 @@ public LevelDelegation getLevelDelegation() {
* <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector
* for OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay
* consistent here.
*
* <p>But notice that children of RowColumnVector still get null value when entire outer row is
* null, so when {@code parentIsRowType} is true the null value is still stored.
*/
private boolean readValue() throws IOException {
int left = readPageIfNeed();
Expand All @@ -222,7 +228,11 @@ private boolean readValue() throws IOException {
} else if (definitionLevel == maxDefLevel - 1) {
lastValue.setValue(null);
} else {
lastValue.skip();
if (parentIsRowType) {
lastValue.setValue(null);
} else {
lastValue.skip();
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
Expand Down Expand Up @@ -454,9 +455,7 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception
format.createReader(
new FormatReaderContext(
new LocalFileIO(), path, new LocalFileIO().getFileSize(path)));
List<InternalRow> results = new ArrayList<>(1283);
reader.forEachRemaining(results::add);
compareNestedRow(rows, results);
compareNestedRow(rows, new RecordReaderIterator<>(reader));
}

@Test
Expand Down Expand Up @@ -525,41 +524,9 @@ private int testReadingFile(List<Integer> expected, Path path) throws IOExceptio
}
Integer v = expected.get(cnt.get());
if (v == null) {
assertThat(row.isNullAt(0)).isTrue();
assertThat(row.isNullAt(1)).isTrue();
assertThat(row.isNullAt(2)).isTrue();
assertThat(row.isNullAt(3)).isTrue();
assertThat(row.isNullAt(4)).isTrue();
assertThat(row.isNullAt(5)).isTrue();
assertThat(row.isNullAt(6)).isTrue();
assertThat(row.isNullAt(7)).isTrue();
assertThat(row.isNullAt(8)).isTrue();
assertThat(row.isNullAt(9)).isTrue();
assertThat(row.isNullAt(10)).isTrue();
assertThat(row.isNullAt(11)).isTrue();
assertThat(row.isNullAt(12)).isTrue();
assertThat(row.isNullAt(13)).isTrue();
assertThat(row.isNullAt(14)).isTrue();
assertThat(row.isNullAt(15)).isTrue();
assertThat(row.isNullAt(16)).isTrue();
assertThat(row.isNullAt(17)).isTrue();
assertThat(row.isNullAt(18)).isTrue();
assertThat(row.isNullAt(19)).isTrue();
assertThat(row.isNullAt(20)).isTrue();
assertThat(row.isNullAt(21)).isTrue();
assertThat(row.isNullAt(22)).isTrue();
assertThat(row.isNullAt(23)).isTrue();
assertThat(row.isNullAt(24)).isTrue();
assertThat(row.isNullAt(25)).isTrue();
assertThat(row.isNullAt(26)).isTrue();
assertThat(row.isNullAt(27)).isTrue();
assertThat(row.isNullAt(28)).isTrue();
assertThat(row.isNullAt(29)).isTrue();
assertThat(row.isNullAt(30)).isTrue();
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
assertThat(row.isNullAt(33)).isTrue();
assertThat(row.isNullAt(34)).isTrue();
for (int i = 0; i < 35; i++) {
assertThat(row.isNullAt(i)).isTrue();
}
} else {
assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
Expand Down Expand Up @@ -826,7 +793,6 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou
row1.add(0, i);
Group row2 = rowList.addGroup(0);
row2.add(0, i + 1);
f4.addGroup(0);

// add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>, `f1` INT>>
Group f5 = row.addGroup("f5");
Expand All @@ -835,7 +801,6 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou
Group insideArray = insideRow.addGroup(0);
createParquetDoubleNestedArray(insideArray, i);
insideRow.add(1, i);
arrayRow.addGroup(0);
f5.add(1, i);
writer.write(row);
}
Expand Down Expand Up @@ -873,12 +838,12 @@ private void createParquetMapGroup(Group map, String key, String value) {
}
}

private void compareNestedRow(List<InternalRow> rows, List<InternalRow> results) {
Assertions.assertEquals(rows.size(), results.size());
private void compareNestedRow(
List<InternalRow> rows, RecordReaderIterator<InternalRow> iterator) throws Exception {
for (InternalRow origin : rows) {
assertThat(iterator.hasNext()).isTrue();
InternalRow result = iterator.next();

for (InternalRow result : results) {
int index = result.getInt(0);
InternalRow origin = rows.get(index);
Assertions.assertEquals(origin.getInt(0), result.getInt(0));

// int[]
Expand Down Expand Up @@ -967,6 +932,8 @@ private void compareNestedRow(List<InternalRow> rows, List<InternalRow> results)
result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
Assertions.assertEquals(origin.getRow(5, 2).getInt(1), result.getRow(5, 2).getInt(1));
}
assertThat(iterator.hasNext()).isFalse();
iterator.close();
}

private void fillWithMap(Map<String, String> map, InternalMap internalMap, int index) {
Expand Down

0 comments on commit 7a1bca3

Please sign in to comment.