From 1fd81f2ae37b6ccdc4392412b0590ff8488d4498 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Sat, 21 Dec 2024 21:49:22 +0800 Subject: [PATCH] [parquet] parquet reader should not retrun VectorizedRowIterator for nested schema (#4749) --- .../data/columnar/ColumnarRowIterator.java | 22 +++---- .../data/columnar/VectorizedColumnBatch.java | 6 -- .../data/columnar/VectorizedRowIterator.java | 45 ++++++++++++++ .../paimon/format/orc/OrcReaderFactory.java | 5 +- .../format/parquet/ParquetReaderFactory.java | 30 ++++++++- .../parquet/ParquetColumnVectorTest.java | 62 +++++++++++++------ 6 files changed, 125 insertions(+), 45 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 874c22134864..faecd1ccd88c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -23,7 +23,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.VectorizedRecordIterator; import org.apache.paimon.utils.RecyclableIterator; import org.apache.paimon.utils.VectorMappingUtils; @@ -34,15 +33,15 @@ * {@link ColumnarRow#setRowId}. */ public class ColumnarRowIterator extends RecyclableIterator - implements FileRecordIterator, VectorizedRecordIterator { + implements FileRecordIterator { - private final Path filePath; - private final ColumnarRow row; - private final Runnable recycler; + protected final Path filePath; + protected final ColumnarRow row; + protected final Runnable recycler; - private int num; - private int nextPos; - private long nextFilePos; + protected int num; + protected int nextPos; + protected long nextFilePos; public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) { super(recycler); @@ -79,7 +78,7 @@ public Path filePath() { return this.filePath; } - public ColumnarRowIterator copy(ColumnVector[] vectors) { + protected ColumnarRowIterator copy(ColumnVector[] vectors) { ColumnarRowIterator newIterator = new ColumnarRowIterator(filePath, row.copy(vectors), recycler); newIterator.reset(nextFilePos); @@ -101,9 +100,4 @@ public ColumnarRowIterator mapping( } return this; } - - @Override - public VectorizedColumnBatch batch() { - return row.batch(); - } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java index dab5356435fb..4cf5f4c7c26b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java @@ -38,12 +38,6 @@ public class VectorizedColumnBatch implements Serializable { private static final long serialVersionUID = 8180323238728166155L; - /** - * This number is carefully chosen to minimize overhead and typically allows one - * VectorizedColumnBatch to fit in cache. - */ - public static final int DEFAULT_SIZE = 2048; - private int numRows; public final ColumnVector[] columns; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java new file mode 100644 index 000000000000..889da334c594 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.VectorizedRecordIterator; + +import javax.annotation.Nullable; + +/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */ +public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator { + + public VectorizedRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) { + super(filePath, row, recycler); + } + + @Override + public VectorizedColumnBatch batch() { + return row.batch(); + } + + @Override + protected VectorizedRowIterator copy(ColumnVector[] vectors) { + VectorizedRowIterator newIterator = + new VectorizedRowIterator(filePath, row.copy(vectors), recycler); + newIterator.reset(nextFilePos); + return newIterator; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index db17357bfd70..6683b357fd57 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.data.columnar.VectorizedRowIterator; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.format.FormatReaderFactory; @@ -158,7 +159,7 @@ private static class OrcReaderBatch { private final Pool.Recycler recycler; private final VectorizedColumnBatch paimonColumnBatch; - private final ColumnarRowIterator result; + private final VectorizedRowIterator result; protected OrcReaderBatch( final Path filePath, @@ -169,7 +170,7 @@ protected OrcReaderBatch( this.recycler = checkNotNull(recycler); this.paimonColumnBatch = paimonColumnBatch; this.result = - new ColumnarRowIterator( + new VectorizedRowIterator( filePath, new ColumnarRow(paimonColumnBatch), this::recycle); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 910f3031e09f..9fef7563718c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -19,10 +19,14 @@ package org.apache.paimon.format.parquet; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.columnar.ArrayColumnVector; import org.apache.paimon.data.columnar.ColumnVector; import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; +import org.apache.paimon.data.columnar.MapColumnVector; +import org.apache.paimon.data.columnar.RowColumnVector; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.data.columnar.VectorizedRowIterator; import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.format.FormatReaderFactory; @@ -525,7 +529,8 @@ private ParquetReaderBatch createReaderBatch( private static class ParquetReaderBatch { private final WritableColumnVector[] writableVectors; - protected final VectorizedColumnBatch columnarBatch; + private final boolean containsNestedColumn; + private final VectorizedColumnBatch columnarBatch; private final Pool.Recycler recycler; private final ColumnarRowIterator result; @@ -536,11 +541,30 @@ protected ParquetReaderBatch( VectorizedColumnBatch columnarBatch, Pool.Recycler recycler) { this.writableVectors = writableVectors; + this.containsNestedColumn = + Arrays.stream(writableVectors) + .anyMatch( + vector -> + vector instanceof MapColumnVector + || vector instanceof RowColumnVector + || vector instanceof ArrayColumnVector); this.columnarBatch = columnarBatch; this.recycler = recycler; + + /* + * See Reverted #3883. + * If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly, + * it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()} + * to handle it row by row. + * + *

TODO: delete this after #3883 is fixed completely. + */ this.result = - new ColumnarRowIterator( - filePath, new ColumnarRow(columnarBatch), this::recycle); + containsNestedColumn + ? new ColumnarRowIterator( + filePath, new ColumnarRow(columnarBatch), this::recycle) + : new VectorizedRowIterator( + filePath, new ColumnarRow(columnarBatch), this::recycle); } public void recycle() { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java index 0d862c3963ff..873a3f036d74 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java @@ -29,8 +29,8 @@ import org.apache.paimon.data.columnar.ArrayColumnVector; import org.apache.paimon.data.columnar.BytesColumnVector; import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.IntColumnVector; -import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder; @@ -64,6 +64,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.paimon.data.BinaryString.fromString; import static org.assertj.core.api.Assertions.assertThat; /** Validate the {@link ColumnVector}s read by Parquet format. */ @@ -78,6 +79,25 @@ public class ParquetColumnVectorTest { ? "null" : new String(((BytesColumnVector) cv).getBytes(i).getBytes()); + @Test + public void testNormalStrings() throws IOException { + RowType rowType = + RowType.builder() + .field("s1", DataTypes.STRING()) + .field("s2", DataTypes.STRING()) + .field("s3", DataTypes.STRING()) + .build(); + + int numRows = RND.nextInt(5) + 5; + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + rows.add(GenericRow.of(fromString(i + ""), fromString(i + ""), fromString(i + ""))); + } + + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isInstanceOf(VectorizedRecordIterator.class); + } + @Test public void testArrayString() throws IOException { RowType rowType = @@ -107,8 +127,8 @@ public void testArrayString() throws IOException { rows.add(GenericRow.of(array)); } - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING()); // validate row by row @@ -175,8 +195,8 @@ public void testArrayArrayString() throws IOException { rows.add(GenericRow.of(new GenericArray(innerArrays))); } - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING()); // validate row by row @@ -224,13 +244,13 @@ public void testMapString() throws IOException { expectedData.add(currentStringArray); Map map = new HashMap<>(); for (int idx = 0; idx < currentSize; idx++) { - map.put(idx, BinaryString.fromString(currentStringArray.get(idx))); + map.put(idx, fromString(currentStringArray.get(idx))); } rows.add(GenericRow.of(new GenericMap(map))); } - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING()); // validate row by row @@ -310,8 +330,9 @@ public void testMapArrayString() throws IOException { rows.add(GenericRow.of(new GenericMap(map))); } - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); + InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING()); // validate row by row @@ -420,8 +441,8 @@ public void testRow() throws IOException { rows.add(GenericRow.of(GenericRow.of(i, array))); } - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING()); // validate row by row @@ -487,7 +508,7 @@ public void testArrayRowArray() throws IOException { List rows = new ArrayList<>(4); List f0 = new ArrayList<>(3); for (int i = 0; i < 3; i++) { - f0.add(BinaryString.fromString(randomString())); + f0.add(fromString(randomString())); } GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new Object[] {0, null})); @@ -504,8 +525,8 @@ public void testArrayRowArray() throws IOException { GenericArray array3 = new GenericArray(new GenericRow[] {}); rows.add(GenericRow.of(array3)); - VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows); - VectorizedColumnBatch batch = iterator.batch(); + ColumnarRowIterator iterator = createRecordIterator(rowType, rows); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); // validate row by row InternalRow row0 = iterator.next(); @@ -625,8 +646,9 @@ public void testHighlyNestedSchema() throws IOException { InternalRow row3 = GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[] {null}), null)); - VectorizedRecordIterator iterator = - createVectorizedRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3)); + ColumnarRowIterator iterator = + createRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3)); + assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class); // validate column vector // VectorizedColumnBatch batch = iterator.batch(); @@ -704,8 +726,8 @@ public void testHighlyNestedSchema() throws IOException { iterator.releaseBatch(); } - private VectorizedRecordIterator createVectorizedRecordIterator( - RowType rowType, List rows) throws IOException { + private ColumnarRowIterator createRecordIterator(RowType rowType, List rows) + throws IOException { Path path = new Path(tempDir.toString(), UUID.randomUUID().toString()); LocalFileIO fileIO = LocalFileIO.create(); @@ -725,7 +747,7 @@ private VectorizedRecordIterator createVectorizedRecordIterator( new FormatReaderContext(fileIO, path, fileIO.getFileSize(path))); RecordReader.RecordIterator iterator = reader.readBatch(); - return (VectorizedRecordIterator) iterator; + return (ColumnarRowIterator) iterator; } @Nullable