From cefb904ac77993ae3ba0be917aa2685b9ce54823 Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Thu, 19 Dec 2024 18:55:26 +0800 Subject: [PATCH] [core] fix test case and make row child type's length which is array is the same as other child vector. --- .../parquet/reader/NestedColumnReader.java | 4 +--- .../parquet/reader/NestedPositionUtil.java | 10 ++++---- .../reader/NestedPrimitiveColumnReader.java | 23 ++++++++++++++++--- .../format/parquet/ParquetReadWriteTest.java | 4 ---- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 5641f8e073ae..8f20be275447 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -141,9 +141,7 @@ private Pair readRow( boolean hasNull = false; for (int i = 0; i < len; i++) { for (WritableColumnVector child : finalChildrenVectors) { - if (((ElementCountable) child).getLen() - 1 >= i) { - isNull[i] = isNull[i] && child.isNullAt(i); - } + isNull[i] = isNull[i] && child.isNullAt(i); } if (isNull[i]) { hasNull = true; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index 473ef332c296..2d73ac07a7bd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -99,10 +99,12 @@ public static CollectionPosition calculateCollectionOffsets( } else { // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the // collection is not defined, no need to increase offset. - nullCollectionFlags.add(true); - nullValuesCount++; - emptyCollectionFlags.add(false); - valueCount++; + if (readRowField) { + nullCollectionFlags.add(true); + nullValuesCount++; + emptyCollectionFlags.add(false); + valueCount++; + } } } long[] offsetsArray = offsets.toArray(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index dd115698ecdc..0aaa41d5b6c9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -107,6 +107,11 @@ public class NestedPrimitiveColumnReader implements ColumnReader valueList) throws boolean needFilterSkip = pageRowId < rangeStart; do { - if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); + valueIndex++; + } else if (readRowField) { + valueIndex++; } - valueIndex++; + readState.valuesToReadInPage = readState.valuesToReadInPage - 1; } while (readValue() && (repetitionLevel != 0)); if (pageRowId == readState.rowId) { @@ -216,6 +223,12 @@ private int collectDataFromParquetPage(int total, List valueList) throws } } + // When the values to read in page > 0 and row to read in batch == 0, it means the + // repetition level contains the next value's, so need to set the cutLevel flag to true. + if (readState.valuesToReadInPage > 0 && readState.rowsToReadInBatch == 0) { + cutLevel = true; + } + return valueIndex; } @@ -226,6 +239,11 @@ public LevelDelegation getLevelDelegation() { definitionLevelList.clear(); repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); + if (cutLevel) { + repetition = Arrays.copyOf(repetition, repetition.length - 1); + definition = Arrays.copyOf(definition, definition.length - 1); + cutLevel = false; + } return new LevelDelegation(repetition, definition); } @@ -289,7 +307,6 @@ private void readAndSaveRepetitionAndDefinitionLevels() { // get the values of repetition and definitionLevel repetitionLevel = repetitionLevelColumn.nextInt(); definitionLevel = definitionLevelColumn.nextInt(); - readState.valuesToReadInPage = readState.valuesToReadInPage - 1; repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index d7d16450a50f..e73e52f9d992 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -61,7 +61,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; @@ -840,14 +839,11 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou MessageType schema = ParquetSchemaConverter.convertToParquetMessageType( "paimon-parquet", NESTED_ARRAY_MAP_TYPE); - String[] candidates = new String[] {"snappy", "zstd", "gzip"}; - String compress = candidates[new Random().nextInt(3)]; try (ParquetWriter writer = ExampleParquetWriter.builder( HadoopOutputFile.fromPath( new org.apache.hadoop.fs.Path(path.toString()), conf)) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .withCompressionCodec(CompressionCodecName.fromConf(compress)) .withConf(new Configuration()) .withType(schema) .build()) {