diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java index f0e82eac4fb1..fee204015d4d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.columnar.writable.AbstractWritableVector; import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.utils.Preconditions; import java.nio.ByteOrder; import java.util.Arrays; @@ -54,6 +55,23 @@ public AbstractHeapVector(int len) { this.len = len; } + // This will be called only when inner vectors don't have data. + public AbstractHeapVector(int len, boolean[] isNull) { + Preconditions.checkArgument( + len == isNull.length, "len should be equal to isNull's length."); + + for (boolean element : isNull) { + if (!element) { + throw new UnsupportedOperationException( + "This constructor can only be called when the vector is all null."); + } + } + + this.len = len; + this.isNull = isNull; + this.noNulls = false; + } + /** * Resets the column to default state. - fills the isNull array with false. - sets noNulls to * true. diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java index 3c88699cffe4..4ee27091f7f2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java @@ -34,6 +34,10 @@ public HeapBooleanVector(int len) { vector = new boolean[len]; } + public HeapBooleanVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public HeapIntVector reserveDictionaryIds(int capacity) { throw new RuntimeException("HeapBooleanVector has no dictionary."); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java index f5463dcc055b..5c96b771baa3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java @@ -39,6 +39,10 @@ public HeapByteVector(int len) { vector = new byte[len]; } + public HeapByteVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public byte getByte(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java index 8ccbbf3ba6b9..2559b2d586e5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java @@ -66,6 +66,10 @@ public HeapBytesVector(int size) { length = new int[size]; } + public HeapBytesVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public void reset() { super.reset(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java index 71d4b8775735..7b49d3b0cdfc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java @@ -45,6 +45,10 @@ public HeapDoubleVector(int len) { vector = new double[len]; } + public HeapDoubleVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public double getDouble(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java index b1b6a0e0e31d..83645e7bd5a2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java @@ -44,6 +44,10 @@ public HeapFloatVector(int len) { vector = new float[len]; } + public HeapFloatVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public float getFloat(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java index 15192e876bbd..afe89c28eb30 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapIntVector.java @@ -39,6 +39,10 @@ public HeapIntVector(int len) { vector = new int[len]; } + public HeapIntVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public int getInt(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java index 62c5b8108905..491fe3b9f045 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java @@ -39,6 +39,10 @@ public HeapLongVector(int len) { vector = new long[len]; } + public HeapLongVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public long getLong(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java index 472df6b271c1..d6fdc9b554f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java @@ -39,6 +39,10 @@ public HeapShortVector(int len) { vector = new short[len]; } + public HeapShortVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public short getShort(int i) { if (dictionary == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java index 4730b7a89705..7265031852b4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java @@ -28,8 +28,8 @@ public class HeapTimestampVector extends AbstractHeapVector implements WritableT private static final long serialVersionUID = 1L; - private final long[] milliseconds; - private final int[] nanoOfMilliseconds; + private long[] milliseconds; + private int[] nanoOfMilliseconds; public HeapTimestampVector(int len) { super(len); @@ -37,6 +37,10 @@ public HeapTimestampVector(int len) { this.nanoOfMilliseconds = new int[len]; } + public HeapTimestampVector(int len, boolean[] isNull) { + super(len, isNull); + } + @Override public Timestamp getTimestamp(int i, int precision) { if (dictionary == null) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 8f20be275447..5641f8e073ae 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -141,7 +141,9 @@ private Pair readRow( boolean hasNull = false; for (int i = 0; i < len; i++) { for (WritableColumnVector child : finalChildrenVectors) { - isNull[i] = isNull[i] && child.isNullAt(i); + if (((ElementCountable) child).getLen() - 1 >= i) { + isNull[i] = isNull[i] && child.isNullAt(i); + } } if (isNull[i]) { hasNull = true; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index b43169a40b2c..473ef332c296 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -96,9 +96,14 @@ public static CollectionPosition calculateCollectionOffsets( offsets.add(offset); valueCount++; + } else { + // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the + // collection is not defined, no need to increase offset. + nullCollectionFlags.add(true); + nullValuesCount++; + emptyCollectionFlags.add(false); + valueCount++; } - // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the - // collection is not defined, just ignore it } long[] offsetsArray = offsets.toArray(); long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index f0a82a6d711e..dd115698ecdc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -57,6 +57,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; @@ -168,7 +169,10 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto int valueIndex = collectDataFromParquetPage(readNumber, valueList); - return fillColumnVector(valueIndex, valueList); + if (!valueList.isEmpty()) { + return fillColumnVector(valueIndex, valueList); + } + return fillColumnVectorWithNone(valueIndex); } private int collectDataFromParquetPage(int total, List valueList) throws IOException { @@ -199,8 +203,8 @@ private int collectDataFromParquetPage(int total, List valueList) throws if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); - valueIndex++; } + valueIndex++; } while (readValue() && (repetitionLevel != 0)); if (pageRowId == readState.rowId) { @@ -549,6 +553,53 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { } } + private WritableColumnVector fillColumnVectorWithNone(int total) { + boolean[] isNull = new boolean[total]; + Arrays.fill(isNull, true); + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + return new HeapBytesVector(total, isNull); + case BOOLEAN: + return new HeapBooleanVector(total, isNull); + case TINYINT: + return new HeapByteVector(total, isNull); + case SMALLINT: + return new HeapShortVector(total, isNull); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return new HeapIntVector(total, isNull); + case FLOAT: + return new HeapFloatVector(total, isNull); + case BIGINT: + return new HeapLongVector(total, isNull); + case DOUBLE: + return new HeapDoubleVector(total, isNull); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new HeapTimestampVector(total, isNull); + case DECIMAL: + PrimitiveType.PrimitiveTypeName primitiveTypeName = + descriptor.getPrimitiveType().getPrimitiveTypeName(); + switch (primitiveTypeName) { + case INT32: + HeapIntVector phiv = new HeapIntVector(total, isNull); + return new ParquetDecimalVector(phiv, total); + case INT64: + HeapLongVector phlv = new HeapLongVector(total, isNull); + return new ParquetDecimalVector(phlv, total); + default: + HeapBytesVector phbv = new HeapBytesVector(total, isNull); + return new ParquetDecimalVector(phbv, total); + } + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + private static HeapBytesVector getHeapBytesVector(int total, List valueList) { HeapBytesVector phbv = new HeapBytesVector(total); for (int i = 0; i < valueList.size(); i++) { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index ffe4d6008296..d7d16450a50f 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; @@ -176,7 +177,11 @@ public class ParquetReadWriteTest { new ArrayType(true, new IntType()))) .field("c", new IntType()) .build()), - new IntType())); + new IntType()), + RowType.of( + new ArrayType(RowType.of(new VarCharType(255))), + RowType.of(new IntType()), + new VarCharType(255))); @TempDir public File folder; @@ -822,7 +827,8 @@ null, new GenericMap(mp1), new GenericMap(mp2) }), i) }), - i))); + i), + null)); } return rows; } @@ -834,11 +840,14 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou MessageType schema = ParquetSchemaConverter.convertToParquetMessageType( "paimon-parquet", NESTED_ARRAY_MAP_TYPE); + String[] candidates = new String[] {"snappy", "zstd", "gzip"}; + String compress = candidates[new Random().nextInt(3)]; try (ParquetWriter writer = ExampleParquetWriter.builder( HadoopOutputFile.fromPath( new org.apache.hadoop.fs.Path(path.toString()), conf)) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.fromConf(compress)) .withConf(new Configuration()) .withType(schema) .build()) { @@ -1011,6 +1020,10 @@ private void compareNestedRow( origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1), result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1)); Assertions.assertEquals(origin.getRow(5, 2).getInt(1), result.getRow(5, 2).getInt(1)); + Assertions.assertTrue(result.isNullAt(6)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(0)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(1)); + Assertions.assertTrue(result.getRow(6, 2).isNullAt(2)); } assertThat(iterator.hasNext()).isFalse(); iterator.close();