diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index e3900580034b..165527adc688 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -86,11 +86,15 @@ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, ParquetFi @Override public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { - readData(field, readNumber, vector, false); + readData(field, readNumber, vector, false, false); } private Pair readData( - ParquetField field, int readNumber, ColumnVector vector, boolean inside) + ParquetField field, + int readNumber, + ColumnVector vector, + boolean inside, + boolean parentIsRowType) throws IOException { if (field.getType() instanceof RowType) { return readRow((ParquetGroupField) field, readNumber, vector, inside); @@ -99,7 +103,8 @@ private Pair readData( } else if (field.getType() instanceof ArrayType) { return readArray((ParquetGroupField) field, readNumber, vector, inside); } else { - return readPrimitive((ParquetPrimitiveField) field, readNumber, vector); + return readPrimitive( + (ParquetPrimitiveField) field, readNumber, vector, parentIsRowType); } } @@ -114,7 +119,7 @@ private Pair readRow( new WritableColumnVector[childrenVectors.length]; for (int i = 0; i < children.size(); i++) { Pair tuple = - readData(children.get(i), readNumber, childrenVectors[i], true); + readData(children.get(i), readNumber, childrenVectors[i], true, true); levelDelegation = tuple.getLeft(); finalChildrenVectors[i] = tuple.getRight(); } @@ -139,7 +144,7 @@ private Pair readRow( } if (rowPosition.getIsNull() != null) { - setFieldNullFalg(rowPosition.getIsNull(), heapRowVector); + setFieldNullFlag(rowPosition.getIsNull(), heapRowVector); } return Pair.of(levelDelegation, heapRowVector); } @@ -155,9 +160,10 @@ private Pair readMap( "Maps must have two type parameters, found %s", children.size()); Pair keyTuple = - readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true); + readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true, false); Pair valueTuple = - readData(children.get(1), readNumber, mapVector.getValueColumnVector(), true); + readData( + children.get(1), readNumber, mapVector.getValueColumnVector(), true, false); LevelDelegation levelDelegation = keyTuple.getLeft(); @@ -181,7 +187,7 @@ private Pair readMap( } if (collectionPosition.getIsNull() != null) { - setFieldNullFalg(collectionPosition.getIsNull(), mapVector); + setFieldNullFlag(collectionPosition.getIsNull(), mapVector); } mapVector.setLengths(collectionPosition.getLength()); @@ -201,7 +207,7 @@ private Pair readArray( "Arrays must have a single type parameter, found %s", children.size()); Pair tuple = - readData(children.get(0), readNumber, arrayVector.getChild(), true); + readData(children.get(0), readNumber, arrayVector.getChild(), true, false); LevelDelegation levelDelegation = tuple.getLeft(); CollectionPosition collectionPosition = @@ -219,7 +225,7 @@ private Pair readArray( } if (collectionPosition.getIsNull() != null) { - setFieldNullFalg(collectionPosition.getIsNull(), arrayVector); + setFieldNullFlag(collectionPosition.getIsNull(), arrayVector); } arrayVector.setLengths(collectionPosition.getLength()); arrayVector.setOffsets(collectionPosition.getOffsets()); @@ -227,7 +233,11 @@ private Pair readArray( } private Pair readPrimitive( - ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws IOException { + ParquetPrimitiveField field, + int readNumber, + ColumnVector vector, + boolean parentIsRowType) + throws IOException { ColumnDescriptor descriptor = field.getDescriptor(); NestedPrimitiveColumnReader reader = columnReaders.get(descriptor); if (reader == null) { @@ -237,7 +247,8 @@ private Pair readPrimitive( pages.getPageReader(descriptor), isUtcTimestamp, descriptor.getPrimitiveType(), - field.getType()); + field.getType(), + parentIsRowType); columnReaders.put(descriptor, reader); } WritableColumnVector writableColumnVector = @@ -245,7 +256,7 @@ private Pair readPrimitive( return Pair.of(reader.getLevelDelegation(), writableColumnVector); } - private static void setFieldNullFalg(boolean[] nullFlags, AbstractHeapVector vector) { + private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector vector) { for (int index = 0; index < vector.getLen() && index < nullFlags.length; index++) { if (nullFlags[index]) { vector.setNullAt(index); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index 1c8298eaeb6a..62338278512a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -108,6 +108,8 @@ public static CollectionPosition calculateCollectionOffsets( i < definitionLevels.length; i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) { if (definitionLevels[i] >= collectionDefinitionLevel - 1) { + valueCount++; + boolean isNull = isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel); if (isNull) { @@ -142,7 +144,6 @@ public static CollectionPosition calculateCollectionOffsets( collectionDefinitionLevel)); } offsets.add(offset); - valueCount++; } // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the // collection is not defined, just ignore it diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 0b9fdb1f6a1d..7837eb8148e9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -73,12 +73,13 @@ public class NestedPrimitiveColumnReader implements ColumnReader [5, 4, 5, 3, 2, 1, 0] + * An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0] * *
    *
  • definitionLevel == maxDefLevel => not null value @@ -205,6 +208,9 @@ public LevelDelegation getLevelDelegation() { *

    When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector * for OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay * consistent here. + * + *

    But notice that children of RowColumnVector still get null value when entire outer row is + * null, so when {@code parentIsRowType} is true the null value is still stored. */ private boolean readValue() throws IOException { int left = readPageIfNeed(); @@ -222,7 +228,11 @@ private boolean readValue() throws IOException { } else if (definitionLevel == maxDefLevel - 1) { lastValue.setValue(null); } else { - lastValue.skip(); + if (parentIsRowType) { + lastValue.setValue(null); + } else { + lastValue.skip(); + } } return true; } else { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index a3aa1f85bf04..57fd0235bec6 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BooleanType; @@ -454,9 +455,7 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception format.createReader( new FormatReaderContext( new LocalFileIO(), path, new LocalFileIO().getFileSize(path))); - List results = new ArrayList<>(1283); - reader.forEachRemaining(results::add); - compareNestedRow(rows, results); + compareNestedRow(rows, new RecordReaderIterator<>(reader)); } @Test @@ -525,41 +524,9 @@ private int testReadingFile(List expected, Path path) throws IOExceptio } Integer v = expected.get(cnt.get()); if (v == null) { - assertThat(row.isNullAt(0)).isTrue(); - assertThat(row.isNullAt(1)).isTrue(); - assertThat(row.isNullAt(2)).isTrue(); - assertThat(row.isNullAt(3)).isTrue(); - assertThat(row.isNullAt(4)).isTrue(); - assertThat(row.isNullAt(5)).isTrue(); - assertThat(row.isNullAt(6)).isTrue(); - assertThat(row.isNullAt(7)).isTrue(); - assertThat(row.isNullAt(8)).isTrue(); - assertThat(row.isNullAt(9)).isTrue(); - assertThat(row.isNullAt(10)).isTrue(); - assertThat(row.isNullAt(11)).isTrue(); - assertThat(row.isNullAt(12)).isTrue(); - assertThat(row.isNullAt(13)).isTrue(); - assertThat(row.isNullAt(14)).isTrue(); - assertThat(row.isNullAt(15)).isTrue(); - assertThat(row.isNullAt(16)).isTrue(); - assertThat(row.isNullAt(17)).isTrue(); - assertThat(row.isNullAt(18)).isTrue(); - assertThat(row.isNullAt(19)).isTrue(); - assertThat(row.isNullAt(20)).isTrue(); - assertThat(row.isNullAt(21)).isTrue(); - assertThat(row.isNullAt(22)).isTrue(); - assertThat(row.isNullAt(23)).isTrue(); - assertThat(row.isNullAt(24)).isTrue(); - assertThat(row.isNullAt(25)).isTrue(); - assertThat(row.isNullAt(26)).isTrue(); - assertThat(row.isNullAt(27)).isTrue(); - assertThat(row.isNullAt(28)).isTrue(); - assertThat(row.isNullAt(29)).isTrue(); - assertThat(row.isNullAt(30)).isTrue(); - assertThat(row.isNullAt(31)).isTrue(); - assertThat(row.isNullAt(32)).isTrue(); - assertThat(row.isNullAt(33)).isTrue(); - assertThat(row.isNullAt(34)).isTrue(); + for (int i = 0; i < 35; i++) { + assertThat(row.isNullAt(i)).isTrue(); + } } else { assertThat(row.getString(0)).hasToString("" + v); assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0); @@ -826,7 +793,6 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou row1.add(0, i); Group row2 = rowList.addGroup(0); row2.add(0, i + 1); - f4.addGroup(0); // add ROW<`f0` ARRAY>, `c` INT>>, `f1` INT>> Group f5 = row.addGroup("f5"); @@ -835,7 +801,6 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou Group insideArray = insideRow.addGroup(0); createParquetDoubleNestedArray(insideArray, i); insideRow.add(1, i); - arrayRow.addGroup(0); f5.add(1, i); writer.write(row); } @@ -873,12 +838,12 @@ private void createParquetMapGroup(Group map, String key, String value) { } } - private void compareNestedRow(List rows, List results) { - Assertions.assertEquals(rows.size(), results.size()); + private void compareNestedRow( + List rows, RecordReaderIterator iterator) throws Exception { + for (InternalRow origin : rows) { + assertThat(iterator.hasNext()).isTrue(); + InternalRow result = iterator.next(); - for (InternalRow result : results) { - int index = result.getInt(0); - InternalRow origin = rows.get(index); Assertions.assertEquals(origin.getInt(0), result.getInt(0)); // int[] @@ -967,6 +932,8 @@ private void compareNestedRow(List rows, List results) result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1)); Assertions.assertEquals(origin.getRow(5, 2).getInt(1), result.getRow(5, 2).getInt(1)); } + assertThat(iterator.hasNext()).isFalse(); + iterator.close(); } private void fillWithMap(Map map, InternalMap internalMap, int index) {