From e0554db8f7d4ff0f40f1d4719be2ce9d2f07f84b Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Tue, 19 Nov 2024 14:17:58 +0800 Subject: [PATCH 1/2] [core] fix parquet can not read empty row with first column is array. --- .../columnar/heap/AbstractHeapVector.java | 18 ++++++ .../data/columnar/heap/HeapBooleanVector.java | 4 ++ .../data/columnar/heap/HeapByteVector.java | 4 ++ .../data/columnar/heap/HeapBytesVector.java | 4 ++ .../data/columnar/heap/HeapDoubleVector.java | 4 ++ .../data/columnar/heap/HeapFloatVector.java | 4 ++ .../data/columnar/heap/HeapIntVector.java | 4 ++ .../data/columnar/heap/HeapLongVector.java | 4 ++ .../data/columnar/heap/HeapShortVector.java | 4 ++ .../columnar/heap/HeapTimestampVector.java | 8 ++- .../parquet/reader/NestedColumnReader.java | 4 +- .../parquet/reader/NestedPositionUtil.java | 9 ++- .../reader/NestedPrimitiveColumnReader.java | 55 ++++++++++++++++++- .../format/parquet/ParquetReadWriteTest.java | 17 +++++- 14 files changed, 134 insertions(+), 9 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java index f0e82eac4fb1..fee204015d4d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.columnar.writable.AbstractWritableVector; import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.utils.Preconditions; import java.nio.ByteOrder; import java.util.Arrays; @@ -54,6 +55,23 @@ public AbstractHeapVector(int len) { this.len = len; } + // This will be called only when inner vectors don't have data. + public AbstractHeapVector(int len, boolean[] isNull) { + Preconditions.checkArgument( + len == isNull.length, "len should be equal to isNull's length."); + + for (boolean element : isNull) { + if (!element) { + throw new UnsupportedOperationException( + "This constructor can only be called when the vector is all null."); + } + } + + this.len = len; + this.isNull = isNull; + this.noNulls = false; + } + /** * Resets the column to default state. - fills the isNull array with false. - sets noNulls to * true. diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java index 3c88699cffe4..4ee27091f7f2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java @@ -34,6 +34,10 @@ public HeapBooleanVector(int len) { vector = new boolean[len]; } + public HeapBooleanVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public HeapIntVector reserveDictionaryIds(int capacity) { throw new RuntimeException("HeapBooleanVector has no dictionary."); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java index f5463dcc055b..5c96b771baa3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java @@ -39,6 +39,10 @@ public HeapByteVector(int len) { vector = new byte[len]; } + public HeapByteVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public byte getByte(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java index 8ccbbf3ba6b9..2559b2d586e5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java @@ -66,6 +66,10 @@ public HeapBytesVector(int size) { length = new int[size]; } + public HeapBytesVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public void reset() { super.reset(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java index 71d4b8775735..7b49d3b0cdfc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java @@ -45,6 +45,10 @@ public HeapDoubleVector(int len) { vector = new double[len]; } + public HeapDoubleVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public double getDouble(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java index b1b6a0e0e31d..83645e7bd5a2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java @@ -44,6 +44,10 @@ public HeapFloatVector(int len) { vector = new float[len]; } + public HeapFloatVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public float getFloat(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java index 15192e876bbd..afe89c28eb30 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java @@ -39,6 +39,10 @@ public HeapIntVector(int len) { vector = new int[len]; } + public HeapIntVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public int getInt(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java index 62c5b8108905..491fe3b9f045 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java @@ -39,6 +39,10 @@ public HeapLongVector(int len) { vector = new long[len]; } + public HeapLongVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public long getLong(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java index 472df6b271c1..d6fdc9b554f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java @@ -39,6 +39,10 @@ public HeapShortVector(int len) { vector = new short[len]; } + public HeapShortVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public short getShort(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java index 4730b7a89705..7265031852b4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java @@ -28,8 +28,8 @@ public class HeapTimestampVector extends AbstractHeapVector implements WritableT private static final long serialVersionUID = 1L; - private final long[] milliseconds; - private final int[] nanoOfMilliseconds; + private long[] milliseconds; + private int[] nanoOfMilliseconds; public HeapTimestampVector(int len) { super(len); @@ -37,6 +37,10 @@ public HeapTimestampVector(int len) { this.nanoOfMilliseconds = new int[len]; } + public HeapTimestampVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public Timestamp getTimestamp(int i, int precision) { if (dictionary == null) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 8f20be275447..5641f8e073ae 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -141,7 +141,9 @@ private Pair readRow( boolean hasNull = false; for (int i = 0; i < len; i++) { for (WritableColumnVector child : finalChildrenVectors) { - isNull[i] = isNull[i] && child.isNullAt(i); + if (((ElementCountable) child).getLen() - 1 >= i) { + isNull[i] = isNull[i] && child.isNullAt(i); + } } if (isNull[i]) { hasNull = true; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index b43169a40b2c..473ef332c296 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -96,9 +96,14 @@ public static CollectionPosition calculateCollectionOffsets( offsets.add(offset); valueCount++; + } else { + // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the + // collection is not defined, no need to increase offset. + nullCollectionFlags.add(true); + nullValuesCount++; + emptyCollectionFlags.add(false); + valueCount++; } - // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the - // collection is not defined, just ignore it } long[] offsetsArray = offsets.toArray(); long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index f0a82a6d711e..dd115698ecdc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -57,6 +57,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; @@ -168,7 +169,10 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto int valueIndex = collectDataFromParquetPage(readNumber, valueList); - return fillColumnVector(valueIndex, valueList); + if (!valueList.isEmpty()) { + return fillColumnVector(valueIndex, valueList); + } + return fillColumnVectorWithNone(valueIndex); } private int collectDataFromParquetPage(int total, List valueList) throws IOException { @@ -199,8 +203,8 @@ private int collectDataFromParquetPage(int total, List valueList) throws if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); - valueIndex++; } + valueIndex++; } while (readValue() && (repetitionLevel != 0)); if (pageRowId == readState.rowId) { @@ -549,6 +553,53 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { } } + private WritableColumnVector fillColumnVectorWithNone(int total) { + boolean[] isNull = new boolean[total]; + Arrays.fill(isNull, true); + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + return new HeapBytesVector(total, isNull); + case BOOLEAN: + return new HeapBooleanVector(total, isNull); + case TINYINT: + return new HeapByteVector(total, isNull); + case SMALLINT: + return new HeapShortVector(total, isNull); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return new HeapIntVector(total, isNull); + case FLOAT: + return new HeapFloatVector(total, isNull); + case BIGINT: + return new HeapLongVector(total, isNull); + case DOUBLE: + return new HeapDoubleVector(total, isNull); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new HeapTimestampVector(total, isNull); + case DECIMAL: + PrimitiveType.PrimitiveTypeName primitiveTypeName = + descriptor.getPrimitiveType().getPrimitiveTypeName(); + switch (primitiveTypeName) { + case INT32: + HeapIntVector phiv = new HeapIntVector(total, isNull); + return new ParquetDecimalVector(phiv, total); + case INT64: + HeapLongVector phlv = new HeapLongVector(total, isNull); + return new ParquetDecimalVector(phlv, total); + default: + HeapBytesVector phbv = new HeapBytesVector(total, isNull); + return new ParquetDecimalVector(phbv, total); + } + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + private static HeapBytesVector getHeapBytesVector(int total, List valueList) { HeapBytesVector phbv = new HeapBytesVector(total); for (int i = 0; i < valueList.size(); i++) { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index ffe4d6008296..d7d16450a50f 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; @@ -176,7 +177,11 @@ public class ParquetReadWriteTest { new ArrayType(true, new IntType()))) .field("c", new IntType()) .build()), - new IntType())); + new IntType()), + RowType.of( + new ArrayType(RowType.of(new VarCharType(255))), + RowType.of(new IntType()), + new VarCharType(255))); @TempDir public File folder; @@ -822,7 +827,8 @@ null, new GenericMap(mp1), new GenericMap(mp2) }), i) }), - i))); + i), + null)); } return rows; } @@ -834,11 +840,14 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou MessageType schema = ParquetSchemaConverter.convertToParquetMessageType( "paimon-parquet", NESTED_ARRAY_MAP_TYPE); + String[] candidates = new String[] {"snappy", "zstd", "gzip"}; + String compress = candidates[new Random().nextInt(3)]; try (ParquetWriter writer = ExampleParquetWriter.builder( HadoopOutputFile.fromPath( new org.apache.hadoop.fs.Path(path.toString()), conf)) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.fromConf(compress)) .withConf(new Configuration()) .withType(schema) .build()) { @@ -1011,6 +1020,10 @@ private void compareNestedRow( origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1), result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1)); Assertions.assertEquals(origin.getRow(5, 2).getInt(1), result.getRow(5, 2).getInt(1)); + Assertions.assertTrue(result.isNullAt(6)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(0)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(1)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(2)); } assertThat(iterator.hasNext()).isFalse(); iterator.close(); From cefb904ac77993ae3ba0be917aa2685b9ce54823 Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Thu, 19 Dec 2024 18:55:26 +0800 Subject: [PATCH 2/2] [core] fix test case and make row child type's length which is array is the same as other child vector. --- .../parquet/reader/NestedColumnReader.java | 4 +--- .../parquet/reader/NestedPositionUtil.java | 10 ++++---- .../reader/NestedPrimitiveColumnReader.java | 23 ++++++++++++++++--- .../format/parquet/ParquetReadWriteTest.java | 4 ---- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 5641f8e073ae..8f20be275447 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -141,9 +141,7 @@ private Pair readRow( boolean hasNull = false; for (int i = 0; i < len; i++) { for (WritableColumnVector child : finalChildrenVectors) { - if (((ElementCountable) child).getLen() - 1 >= i) { - isNull[i] = isNull[i] && child.isNullAt(i); - } + isNull[i] = isNull[i] && child.isNullAt(i); } if (isNull[i]) { hasNull = true; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index 473ef332c296..2d73ac07a7bd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -99,10 +99,12 @@ public static CollectionPosition calculateCollectionOffsets( } else { // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the // collection is not defined, no need to increase offset. - nullCollectionFlags.add(true); - nullValuesCount++; - emptyCollectionFlags.add(false); - valueCount++; + if (readRowField) { + nullCollectionFlags.add(true); + nullValuesCount++; + emptyCollectionFlags.add(false); + valueCount++; + } } } long[] offsetsArray = offsets.toArray(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index dd115698ecdc..0aaa41d5b6c9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -107,6 +107,11 @@ public class NestedPrimitiveColumnReader implements ColumnReader valueList) throws boolean needFilterSkip = pageRowId < rangeStart; do { - if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); + valueIndex++; + } else if (readRowField) { + valueIndex++; } - valueIndex++; + readState.valuesToReadInPage = readState.valuesToReadInPage - 1; } while (readValue() && (repetitionLevel != 0)); if (pageRowId == readState.rowId) { @@ -216,6 +223,12 @@ private int collectDataFromParquetPage(int total, List valueList) throws } } + // When the values to read in page > 0 and row to read in batch == 0, it means the + // repetition level contains the next value's, so need to set the cutLevel flag to true. + if (readState.valuesToReadInPage > 0 && readState.rowsToReadInBatch == 0) { + cutLevel = true; + } + return valueIndex; } @@ -226,6 +239,11 @@ public LevelDelegation getLevelDelegation() { definitionLevelList.clear(); repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); + if (cutLevel) { + repetition = Arrays.copyOf(repetition, repetition.length - 1); + definition = Arrays.copyOf(definition, definition.length - 1); + cutLevel = false; + } return new LevelDelegation(repetition, definition); } @@ -289,7 +307,6 @@ private void readAndSaveRepetitionAndDefinitionLevels() { // get the values of repetition and definitionLevel repetitionLevel = repetitionLevelColumn.nextInt(); definitionLevel = definitionLevelColumn.nextInt(); - readState.valuesToReadInPage = readState.valuesToReadInPage - 1; repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index d7d16450a50f..e73e52f9d992 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -61,7 +61,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; @@ -840,14 +839,11 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou MessageType schema = ParquetSchemaConverter.convertToParquetMessageType( "paimon-parquet", NESTED_ARRAY_MAP_TYPE); - String[] candidates = new String[] {"snappy", "zstd", "gzip"}; - String compress = candidates[new Random().nextInt(3)]; try (ParquetWriter writer = ExampleParquetWriter.builder( HadoopOutputFile.fromPath( new org.apache.hadoop.fs.Path(path.toString()), conf)) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .withCompressionCodec(CompressionCodecName.fromConf(compress)) .withConf(new Configuration()) .withType(schema) .build()) {