From fb8a8ff6da18c54d70cf33c4b6cd4f8a6838f78a Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sat, 3 Aug 2024 00:34:01 +0800 Subject: [PATCH] [WIP][parquet] Child vector of complex type should arrange elements compactly (like Orc) --- .../reader/NestedPrimitiveColumnReader.java | 116 +++++++++++++++--- .../parquet/ParquetColumnVectorTest.java | 46 ++++++- .../format/parquet/ParquetReadWriteTest.java | 2 +- 3 files changed, 139 insertions(+), 25 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index f48e3c9db4c61..4e5826ec0f573 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -109,7 +109,7 @@ public class NestedPrimitiveColumnReader implements ColumnReader [5, 4, 5, 3, 2, 1, 0] + * + * + * + * When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector for + * OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay + * consistent here. + */ private boolean readValue() throws IOException { int left = readPageIfNeed(); if (left > 0) { @@ -200,12 +215,14 @@ private boolean readValue() throws IOException { if (definitionLevel == maxDefLevel) { if (isCurrentPageDictionaryEncoded) { int dictionaryId = dataColumn.readValueDictionaryId(); - lastValue = dictionaryDecodeValue(dataType, dictionaryId); + lastValue.setValue(dictionaryDecodeValue(dataType, dictionaryId)); } else { - lastValue = readPrimitiveTypedRow(dataType); + lastValue.setValue(readPrimitiveTypedRow(dataType)); } + } else if (definitionLevel == maxDefLevel - 1) { + lastValue.setValue(null); } else { - lastValue = null; + lastValue.skip(); } return true; } else { @@ -353,26 +370,43 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { HeapBytesVector heapBytesVector = new HeapBytesVector(total); for (int i = 0; i < valueList.size(); i++) { byte[] src = ((List) valueList).get(i); - heapBytesVector.appendBytes(i, src, 0, src.length); + if (src == null) { + heapBytesVector.setNullAt(i); + } else { + heapBytesVector.appendBytes(i, src, 0, src.length); + } } return heapBytesVector; case BOOLEAN: HeapBooleanVector heapBooleanVector = new HeapBooleanVector(total); for (int i = 0; i < valueList.size(); i++) { - heapBooleanVector.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + heapBooleanVector.setNullAt(i); + } else { + heapBooleanVector.vector[i] = ((List) valueList).get(i); + } } return heapBooleanVector; case TINYINT: HeapByteVector heapByteVector = new HeapByteVector(total); for (int i = 0; i < valueList.size(); i++) { - heapByteVector.vector[i] = (byte) ((List) valueList).get(i).intValue(); + if (valueList.get(i) == null) { + heapByteVector.setNullAt(i); + } else { + heapByteVector.vector[i] = + (byte) ((List) valueList).get(i).intValue(); + } } return heapByteVector; case SMALLINT: HeapShortVector heapShortVector = new HeapShortVector(total); for (int i = 0; i < valueList.size(); i++) { - heapShortVector.vector[i] = - (short) ((List) valueList).get(i).intValue(); + if (valueList.get(i) == null) { + heapShortVector.setNullAt(i); + } else { + heapShortVector.vector[i] = + (short) ((List) valueList).get(i).intValue(); + } } return heapShortVector; case INTEGER: @@ -380,32 +414,52 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { case TIME_WITHOUT_TIME_ZONE: HeapIntVector heapIntVector = new HeapIntVector(total); for (int i = 0; i < valueList.size(); i++) { - heapIntVector.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + heapIntVector.setNullAt(i); + } else { + heapIntVector.vector[i] = ((List) valueList).get(i); + } } return heapIntVector; case FLOAT: HeapFloatVector heapFloatVector = new HeapFloatVector(total); for (int i = 0; i < valueList.size(); i++) { - heapFloatVector.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + heapFloatVector.setNullAt(i); + } else { + heapFloatVector.vector[i] = ((List) valueList).get(i); + } } return heapFloatVector; case BIGINT: HeapLongVector heapLongVector = new HeapLongVector(total); for (int i = 0; i < valueList.size(); i++) { - heapLongVector.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + heapLongVector.setNullAt(i); + } else { + heapLongVector.vector[i] = ((List) valueList).get(i); + } } return heapLongVector; case DOUBLE: HeapDoubleVector heapDoubleVector = new HeapDoubleVector(total); for (int i = 0; i < valueList.size(); i++) { - heapDoubleVector.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + heapDoubleVector.setNullAt(i); + } else { + heapDoubleVector.vector[i] = ((List) valueList).get(i); + } } return heapDoubleVector; case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: HeapTimestampVector heapTimestampVector = new HeapTimestampVector(total); for (int i = 0; i < valueList.size(); i++) { - heapTimestampVector.setTimestamp(i, ((List) valueList).get(i)); + if (valueList.get(i) == null) { + heapTimestampVector.setNullAt(i); + } else { + heapTimestampVector.setTimestamp(i, ((List) valueList).get(i)); + } } return heapTimestampVector; case DECIMAL: @@ -415,13 +469,21 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { case INT32: HeapIntVector phiv = new HeapIntVector(total); for (int i = 0; i < valueList.size(); i++) { - phiv.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + phiv.setNullAt(i); + } else { + phiv.vector[i] = ((List) valueList).get(i); + } } return new ParquetDecimalVector(phiv); case INT64: HeapLongVector phlv = new HeapLongVector(total); for (int i = 0; i < valueList.size(); i++) { - phlv.vector[i] = ((List) valueList).get(i); + if (valueList.get(i) == null) { + phlv.setNullAt(i); + } else { + phlv.vector[i] = ((List) valueList).get(i); + } } return new ParquetDecimalVector(phlv); default: @@ -600,4 +662,18 @@ public int nextInt() { return 0; } } + + private static class LastValueContainer { + protected boolean shouldSkip; + protected Object value; + + protected void setValue(Object value) { + this.value = value; + this.shouldSkip = false; + } + + protected void skip() { + this.shouldSkip = true; + } + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java index 39189dde3a6ae..80a1dc5b155d7 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java @@ -102,7 +102,7 @@ public void testArrayString() throws IOException { // TODO check PER-ROW - VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows); + VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows).batch(); ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0]; // check nullability @@ -181,7 +181,7 @@ public void testArrayArrayString() throws IOException { // TODO check per-row // validate column vector - VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows); + VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows).batch(); ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0]; expectedData.validateOuterArray( @@ -198,7 +198,45 @@ public void testArrayArrayString() throws IOException { (cv, i) -> new String(((BytesColumnVector) cv).getBytes(i).getBytes())); } - private VectorizedColumnBatch createVectorizedColumnBatch( + @Test + public void testArrayArrayInt() throws IOException { + RowType nestedArrayType = + RowType.builder() + .field("array_array_int", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))) + .build(); + + VectorizedRecordIterator iterator = createVectorizedColumnBatch(nestedArrayType, gen()); + + VectorizedColumnBatch batch = iterator.batch(); + ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0]; + + // TODO check per-row + for (int i = 0; i < 10; i++) { + InternalRow row = iterator.next(); + InternalArray array = row.getArray(0); + System.out.println(); + } + } + + private List gen() { + List rows = new ArrayList<>(1); + for (int i = 0; i < 1; i++) { + rows.add( + GenericRow.of( + new GenericArray( + new Object[] { + new GenericArray(new Object[] {0, null}), + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {}), + null + }))); + } + rows.add(GenericRow.of(new GenericArray(new Object[] {}))); + rows.add(GenericRow.of((Object) null)); + return rows; + } + + private VectorizedRecordIterator createVectorizedColumnBatch( RowType rowType, List rows) throws IOException { Path path = new Path(tempDir.toString(), UUID.randomUUID().toString()); LocalFileIO fileIO = LocalFileIO.create(); @@ -220,7 +258,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch( new FormatReaderContext(fileIO, path, fileIO.getFileSize(path))); RecordReader.RecordIterator iterator = reader.readBatch(); - return ((VectorizedRecordIterator) iterator).batch(); + return (VectorizedRecordIterator) iterator; } /** Store generated data of ARRAY[STRING] and provide validated methods. */ diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index a3aa1f85bf040..6dbb0a8266082 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -438,7 +438,7 @@ void testReadRowPositionWithRandomFilter() throws IOException { @ParameterizedTest @CsvSource({"10, paimon", "1000, paimon", "10, origin", "1000, origin"}) public void testNestedRead(int rowGroupSize, String writerType) throws Exception { - List rows = prepareNestedData(1283); + List rows = prepareNestedData(10); Path path; if ("paimon".equals(writerType)) { path = createTempParquetFileByPaimon(folder, rows, rowGroupSize, NESTED_ARRAY_MAP_TYPE);