Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[parquet] parquet reader should not retrun VectorizedRowIterator for nested schema #4749

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;

Expand All @@ -34,15 +33,15 @@
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
implements FileRecordIterator<InternalRow> {

private final Path filePath;
private final ColumnarRow row;
private final Runnable recycler;
protected final Path filePath;
protected final ColumnarRow row;
protected final Runnable recycler;

private int num;
private int nextPos;
private long nextFilePos;
protected int num;
protected int nextPos;
protected long nextFilePos;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(recycler);
Expand Down Expand Up @@ -79,7 +78,7 @@ public Path filePath() {
return this.filePath;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
Expand All @@ -101,9 +100,4 @@ public ColumnarRowIterator mapping(
}
return this;
}

@Override
public VectorizedColumnBatch batch() {
return row.batch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ public class VectorizedColumnBatch implements Serializable {

private static final long serialVersionUID = 8180323238728166155L;

/**
* This number is carefully chosen to minimize overhead and typically allows one
* VectorizedColumnBatch to fit in cache.
*/
public static final int DEFAULT_SIZE = 2048;

private int numRows;

public final ColumnVector[] columns;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar;

import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;

import javax.annotation.Nullable;

/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator {

public VectorizedRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(filePath, row, recycler);
}

@Override
public VectorizedColumnBatch batch() {
return row.batch();
}

@Override
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
return newIterator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
Expand Down Expand Up @@ -158,7 +159,7 @@ private static class OrcReaderBatch {
private final Pool.Recycler<OrcReaderBatch> recycler;

private final VectorizedColumnBatch paimonColumnBatch;
private final ColumnarRowIterator result;
private final VectorizedRowIterator result;

protected OrcReaderBatch(
final Path filePath,
Expand All @@ -169,7 +170,7 @@ protected OrcReaderBatch(
this.recycler = checkNotNull(recycler);
this.paimonColumnBatch = paimonColumnBatch;
this.result =
new ColumnarRowIterator(
new VectorizedRowIterator(
filePath, new ColumnarRow(paimonColumnBatch), this::recycle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.paimon.format.parquet;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
Expand Down Expand Up @@ -525,7 +529,8 @@ private ParquetReaderBatch createReaderBatch(
private static class ParquetReaderBatch {

private final WritableColumnVector[] writableVectors;
protected final VectorizedColumnBatch columnarBatch;
private final boolean containsNestedColumn;
private final VectorizedColumnBatch columnarBatch;
private final Pool.Recycler<ParquetReaderBatch> recycler;

private final ColumnarRowIterator result;
Expand All @@ -536,11 +541,30 @@ protected ParquetReaderBatch(
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch> recycler) {
this.writableVectors = writableVectors;
this.containsNestedColumn =
Arrays.stream(writableVectors)
.anyMatch(
vector ->
vector instanceof MapColumnVector
|| vector instanceof RowColumnVector
|| vector instanceof ArrayColumnVector);
this.columnarBatch = columnarBatch;
this.recycler = recycler;

/*
* See <a href="https://github.com/apache/paimon/pull/3883">Reverted #3883</a>.
* If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly,
* it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()}
* to handle it row by row.
*
* <p>TODO: delete this after #3883 is fixed completely.
*/
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
containsNestedColumn
? new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle)
: new VectorizedRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
}

public void recycle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
Expand Down Expand Up @@ -64,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;

/** Validate the {@link ColumnVector}s read by Parquet format. */
Expand All @@ -78,6 +79,25 @@ public class ParquetColumnVectorTest {
? "null"
: new String(((BytesColumnVector) cv).getBytes(i).getBytes());

@Test
public void testNormalStrings() throws IOException {
RowType rowType =
RowType.builder()
.field("s1", DataTypes.STRING())
.field("s2", DataTypes.STRING())
.field("s3", DataTypes.STRING())
.build();

int numRows = RND.nextInt(5) + 5;
List<InternalRow> rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
rows.add(GenericRow.of(fromString(i + ""), fromString(i + ""), fromString(i + "")));
}

ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isInstanceOf(VectorizedRecordIterator.class);
}

@Test
public void testArrayString() throws IOException {
RowType rowType =
Expand Down Expand Up @@ -107,8 +127,8 @@ public void testArrayString() throws IOException {
rows.add(GenericRow.of(array));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -175,8 +195,8 @@ public void testArrayArrayString() throws IOException {
rows.add(GenericRow.of(new GenericArray(innerArrays)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -224,13 +244,13 @@ public void testMapString() throws IOException {
expectedData.add(currentStringArray);
Map<Integer, BinaryString> map = new HashMap<>();
for (int idx = 0; idx < currentSize; idx++) {
map.put(idx, BinaryString.fromString(currentStringArray.get(idx)));
map.put(idx, fromString(currentStringArray.get(idx)));
}
rows.add(GenericRow.of(new GenericMap(map)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -310,8 +330,9 @@ public void testMapArrayString() throws IOException {
rows.add(GenericRow.of(new GenericMap(map)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -420,8 +441,8 @@ public void testRow() throws IOException {
rows.add(GenericRow.of(GenericRow.of(i, array)));
}

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter = InternalArray.createElementGetter(DataTypes.STRING());

// validate row by row
Expand Down Expand Up @@ -487,7 +508,7 @@ public void testArrayRowArray() throws IOException {
List<InternalRow> rows = new ArrayList<>(4);
List<BinaryString> f0 = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
f0.add(BinaryString.fromString(randomString()));
f0.add(fromString(randomString()));
}

GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new Object[] {0, null}));
Expand All @@ -504,8 +525,8 @@ public void testArrayRowArray() throws IOException {
GenericArray array3 = new GenericArray(new GenericRow[] {});
rows.add(GenericRow.of(array3));

VectorizedRecordIterator iterator = createVectorizedRecordIterator(rowType, rows);
VectorizedColumnBatch batch = iterator.batch();
ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

// validate row by row
InternalRow row0 = iterator.next();
Expand Down Expand Up @@ -625,8 +646,9 @@ public void testHighlyNestedSchema() throws IOException {
InternalRow row3 =
GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[] {null}), null));

VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3));
ColumnarRowIterator iterator =
createRecordIterator(rowType, Arrays.asList(row0, row1, row2, row3));
assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);

// validate column vector
// VectorizedColumnBatch batch = iterator.batch();
Expand Down Expand Up @@ -704,8 +726,8 @@ public void testHighlyNestedSchema() throws IOException {
iterator.releaseBatch();
}

private VectorizedRecordIterator createVectorizedRecordIterator(
RowType rowType, List<InternalRow> rows) throws IOException {
private ColumnarRowIterator createRecordIterator(RowType rowType, List<InternalRow> rows)
throws IOException {
Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
LocalFileIO fileIO = LocalFileIO.create();

Expand All @@ -725,7 +747,7 @@ private VectorizedRecordIterator createVectorizedRecordIterator(
new FormatReaderContext(fileIO, path, fileIO.getFileSize(path)));

RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
return (VectorizedRecordIterator) iterator;
return (ColumnarRowIterator) iterator;
}

@Nullable
Expand Down
Loading