Skip to content

Commit

Permalink
[parquet] parquet reader should not retrun VectorizedRowIterator for …
Browse files Browse the repository at this point in the history
…nested schema (apache#4749)
  • Loading branch information
JingsongLi authored Dec 21, 2024
1 parent b50040a commit 1fd81f2
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;

Expand All @@ -34,15 +33,15 @@
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
implements FileRecordIterator<InternalRow> {

private final Path filePath;
private final ColumnarRow row;
private final Runnable recycler;
protected final Path filePath;
protected final ColumnarRow row;
protected final Runnable recycler;

private int num;
private int nextPos;
private long nextFilePos;
protected int num;
protected int nextPos;
protected long nextFilePos;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(recycler);
Expand Down Expand Up @@ -79,7 +78,7 @@ public Path filePath() {
return this.filePath;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
Expand All @@ -101,9 +100,4 @@ public ColumnarRowIterator mapping(
}
return this;
}

@Override
public VectorizedColumnBatch batch() {
return row.batch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ public class VectorizedColumnBatch implements Serializable {

private static final long serialVersionUID = 8180323238728166155L;

/**
* This number is carefully chosen to minimize overhead and typically allows one
* VectorizedColumnBatch to fit in cache.
*/
public static final int DEFAULT_SIZE = 2048;

private int numRows;

public final ColumnVector[] columns;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar;

import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;

import javax.annotation.Nullable;

/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator {

public VectorizedRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(filePath, row, recycler);
}

@Override
public VectorizedColumnBatch batch() {
return row.batch();
}

@Override
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
return newIterator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
Expand Down Expand Up @@ -158,7 +159,7 @@ private static class OrcReaderBatch {
private final Pool.Recycler<OrcReaderBatch> recycler;

private final VectorizedColumnBatch paimonColumnBatch;
private final ColumnarRowIterator result;
private final VectorizedRowIterator result;

protected OrcReaderBatch(
final Path filePath,
Expand All @@ -169,7 +170,7 @@ protected OrcReaderBatch(
this.recycler = checkNotNull(recycler);
this.paimonColumnBatch = paimonColumnBatch;
this.result =
new ColumnarRowIterator(
new VectorizedRowIterator(
filePath, new ColumnarRow(paimonColumnBatch), this::recycle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.paimon.format.parquet;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
Expand Down Expand Up @@ -525,7 +529,8 @@ private ParquetReaderBatch createReaderBatch(
private static class ParquetReaderBatch {

private final WritableColumnVector[] writableVectors;
protected final VectorizedColumnBatch columnarBatch;
private final boolean containsNestedColumn;
private final VectorizedColumnBatch columnarBatch;
private final Pool.Recycler<ParquetReaderBatch> recycler;

private final ColumnarRowIterator result;
Expand All @@ -536,11 +541,30 @@ protected ParquetReaderBatch(
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch> recycler) {
this.writableVectors = writableVectors;
this.containsNestedColumn =
Arrays.stream(writableVectors)
.anyMatch(
vector ->
vector instanceof MapColumnVector
|| vector instanceof RowColumnVector
|| vector instanceof ArrayColumnVector);
this.columnarBatch = columnarBatch;
this.recycler = recycler;

/*
* See <a href="https://github.com/apache/paimon/pull/3883">Reverted #3883</a>.
* If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly,
* it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()}
* to handle it row by row.
*
* <p>TODO: delete this after #3883 is fixed completely.
*/
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
containsNestedColumn
? new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle)
: new VectorizedRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
}

public void recycle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
Expand Down Expand Up @@ -64,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;

/** Validate the {@link ColumnVector}s read by Parquet format. */
Expand All @@ -78,6 +79,25 @@ public class ParquetColumnVectorTest {
? "null"
: new String(((BytesColumnVector) cv).getBytes(i).getBytes());

@Test
public void testNormalStrings() throws IOException {
RowType rowType =
RowType.builder()
.field("s1", DataTypes.STRING())
.field("s2", DataTypes.STRING())
.field("s3", DataTypes.STRING())
.build();

int numRows = RND.nextInt(5) + 5;
List<InternalRow> rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
rows.add(GenericRow.of(fromString(i + ""), fromString(i + ""), fromString(i + "")));
}

ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isInstanceOf(VectorizedRecordIterator.class);
}

@Test
public void testArrayString() throws IOException {
RowType rowType =
Expand Down Expand Up @@ -107,8 +127,8 @@ public void testArrayString() throws IOException {
rows.add(GenericRow.of(array));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -175,8 +195,8 @@ public void testArrayArrayString() throws IOException {
rows.add(GenericRow.of(new GenericArray(innerArrays)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -224,13 +244,13 @@ public void testMapString() throws IOException {
expectedData.add(currentStringArray);
Map<Integer, BinaryString> map = new HashMap<>();
for (int idx = 0; idx < currentSize; idx++) {
map.put(idx, BinaryString.fromString(currentStringArray.get(idx)));
map.put(idx, fromString(currentStringArray.get(idx)));
}
rows.add(GenericRow.of(new GenericMap(map)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -310,8 +330,9 @@ public void testMapArrayString() throws IOException {
rows.add(GenericRow.of(new GenericMap(map)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -420,8 +441,8 @@ public void testRow() throws IOException {
rows.add(GenericRow.of(GenericRow.of(i, array)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -487,7 +508,7 @@ public void testArrayRowArray() throws IOException {
List<InternalRow> rows = new ArrayList<>(4);
List<BinaryString> f0 = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
f0.add(BinaryString.fromString(randomString()));
f0.add(fromString(randomString()));
}

GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new Object[] {0, null}));
Expand All @@ -504,8 +525,8 @@ public void testArrayRowArray() throws IOException {
GenericArray array3 = new GenericArray(new GenericRow[] {});
rows.add(GenericRow.of(array3));

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

// validate row by row
InternalRow row0 = iterator.next();
Expand Down Expand Up @@ -625,8 +646,9 @@ public void testHighlyNestedSchema() throws IOException {
InternalRow row3 =
GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[] {null}), null));

VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3));
ColumnarRowIterator iterator =
createRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3));
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

// validate column vector
// VectorizedColumnBatch batch = iterator.batch();
Expand Down Expand Up @@ -704,8 +726,8 @@ public void testHighlyNestedSchema() throws IOException {
iterator.releaseBatch();
}

private VectorizedRecordIterator createVectorizedRecordIterator(
RowType rowType, List<InternalRow> rows) throws IOException {
private ColumnarRowIterator createRecordIterator(RowType rowType, List<InternalRow> rows)
throws IOException {
Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
LocalFileIO fileIO = LocalFileIO.create();

Expand All @@ -725,7 +747,7 @@ private VectorizedRecordIterator createVectorizedRecordIterator(
new FormatReaderContext(fileIO, path, fileIO.getFileSize(path)));

RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
return (VectorizedRecordIterator) iterator;
return (ColumnarRowIterator) iterator;
}

@Nullable
Expand Down

0 comments on commit 1fd81f2

Please sign in to comment.