Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [Arrow] Introduce VectorSchemaRootConverter to get vsr from data splits using ArrowBatchConverter #4748

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow.converter;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.RowType;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;

/**
* Use Paimon API to read records from {@link DataSplit}s and convert to {@link VectorSchemaRoot}.
*/
public class VectorSchemaRootConverter {

private final RecordReader<InternalRow> recordReader;

// for users to override the #init()
protected final RowType readType;
protected BufferAllocator allocator;
protected final boolean vsrCaseSensitive;
protected ArrowVectorizedBatchConverter batchConverter;
protected ArrowPerRowBatchConverter perRowConverter;

private final boolean schemaContainsConstructedType;

private ArrowBatchConverter currentConverter;

private long readBytes = 0;

private boolean inited = false;

public VectorSchemaRootConverter(
ReadBuilder readBuilder,
RowType rowType,
@Nullable int[] projection,
@Nullable Predicate predicate,
List<DataSplit> splits,
boolean vsrCaseSensitive) {
this.readType = projection == null ? rowType : rowType.project(projection);

readBuilder.withProjection(projection);
if (predicate != null) {
readBuilder.withFilter(predicate);
}
TableRead read = readBuilder.newRead();
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
for (DataSplit split : splits) {
readerSuppliers.add(() -> read.createReader(split));
}
try {
this.recordReader = ConcatRecordReader.create(readerSuppliers);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

this.vsrCaseSensitive = vsrCaseSensitive;
this.schemaContainsConstructedType =
rowType.getFieldTypes().stream()
.anyMatch(dataType -> dataType.is(DataTypeFamily.CONSTRUCTED));
}

public void init() {
allocator = new RootAllocator();
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(readType, allocator, vsrCaseSensitive);
ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, readType);
batchConverter = new ArrowVectorizedBatchConverter(vsr, fieldWriters);
perRowConverter = new ArrowPerRowBatchConverter(vsr, fieldWriters);
}

/**
* Get {@link VectorSchemaRoot} of which row count is at most {@code maxBatchRows}. Return null
* if there is no more data.
*/
@Nullable
public VectorSchemaRoot next(int maxBatchRows) {
if (!inited) {
init();
inited = true;
}

if (currentConverter == null) {
resetCurrentConverter();
if (currentConverter == null) {
// no more data
return null;
}
}

VectorSchemaRoot vsr = currentConverter.next(maxBatchRows);
if (vsr == null) {
resetCurrentConverter();
if (currentConverter == null) {
// no more data
return null;
}
vsr = currentConverter.next(maxBatchRows);
}
if (vsr != null) {
readBytes += vsr.getFieldVectors().stream().mapToLong(ValueVector::getBufferSize).sum();
}
return vsr;
}

private void resetCurrentConverter() {
RecordReader.RecordIterator<InternalRow> iterator;
try {
iterator = recordReader.readBatch();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (iterator == null) {
// no more data
currentConverter = null;
return;
}

// TODO: delete this after #3883 is fixed completely.
boolean vectorizedAndCompactly =
((FileRecordIterator<InternalRow>) iterator).vectorizedAndCompactly();
boolean useVectorizedSafely = vectorizedAndCompactly || !schemaContainsConstructedType;

if (isVectorizedWithDv(iterator) && useVectorizedSafely) {
batchConverter.reset((ApplyDeletionFileRecordIterator) iterator);
currentConverter = batchConverter;
} else if (iterator instanceof VectorizedRecordIterator && useVectorizedSafely) {
batchConverter.reset((VectorizedRecordIterator) iterator);
currentConverter = batchConverter;
} else {
perRowConverter.reset(iterator);
currentConverter = perRowConverter;
}
}

public void close() throws IOException {
recordReader.close();
batchConverter.close();
perRowConverter.close();
allocator.close();
}

@VisibleForTesting
static boolean isVectorizedWithDv(RecordReader.RecordIterator<InternalRow> iterator) {
if (iterator instanceof ApplyDeletionFileRecordIterator) {
ApplyDeletionFileRecordIterator deletionIterator =
(ApplyDeletionFileRecordIterator) iterator;
FileRecordIterator<InternalRow> innerIterator = deletionIterator.iterator();
return innerIterator instanceof VectorizedRecordIterator;
}
return false;
}

public long getReadBytes() {
return readBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -888,7 +887,7 @@ private RecordReader.RecordIterator<InternalRow> getApplyDeletionFileRecordItera
.withProjection(projection)
.createReader(table.newReadBuilder().newScan().plan())
.readBatch();
assertThat(isVectorizedWithDv(iterator)).isTrue();
assertThat(VectorSchemaRootConverter.isVectorizedWithDv(iterator)).isTrue();
return iterator;
}

Expand Down Expand Up @@ -924,16 +923,6 @@ private ArrowBatchConverter createArrowWriter(
}
}

private boolean isVectorizedWithDv(RecordReader.RecordIterator<InternalRow> iterator) {
if (iterator instanceof ApplyDeletionFileRecordIterator) {
ApplyDeletionFileRecordIterator deletionIterator =
(ApplyDeletionFileRecordIterator) iterator;
FileRecordIterator<InternalRow> innerIterator = deletionIterator.iterator();
return innerIterator instanceof VectorizedRecordIterator;
}
return false;
}

private Object[] randomRowValues(boolean[] nullable) {
Object[] values = new Object[18];
// The orc char reader will trim the string. See TreeReaderFactory.CharTreeReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,22 @@ public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
private int nextPos;
private long nextFilePos;

private final boolean vectorizedAndCompactly;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
this(filePath, row, recycler, true);
}

public ColumnarRowIterator(
Path filePath,
ColumnarRow row,
@Nullable Runnable recycler,
boolean vectorizedAndCompactly) {
super(recycler);
this.filePath = filePath;
this.row = row;
this.recycler = recycler;
this.vectorizedAndCompactly = vectorizedAndCompactly;
}

public void reset(long nextFilePos) {
Expand Down Expand Up @@ -79,9 +90,15 @@ public Path filePath() {
return this.filePath;
}

@Override
public boolean vectorizedAndCompactly() {
return vectorizedAndCompactly;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
new ColumnarRowIterator(
filePath, row.copy(vectors), recycler, vectorizedAndCompactly);
newIterator.reset(nextFilePos);
return newIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Path filePath() {
return iterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return iterator.vectorizedAndCompactly();
}

@Nullable
@Override
public InternalRow next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public interface FileRecordIterator<T> extends RecordReader.RecordIterator<T> {
/** @return the file path */
Path filePath();

/**
* Return true only if this FileRecordIterator contains a batch and the batch's nested vectors
* are compactly (See <a href="https://github.com/apache/paimon/pull/3883">Reverted #3883</a>).
* If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly,
* it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()}
* to handle it row by row.
*
* <p>TODO: delete this after #3883 is fixed completely.
*/
boolean vectorizedAndCompactly();

@Override
default <R> FileRecordIterator<R> transform(Function<T, R> function) {
FileRecordIterator<T> thisIterator = this;
Expand All @@ -56,6 +67,11 @@ public Path filePath() {
return thisIterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return thisIterator.vectorizedAndCompactly();
}

@Nullable
@Override
public R next() throws IOException {
Expand Down Expand Up @@ -87,6 +103,11 @@ public Path filePath() {
return thisIterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return thisIterator.vectorizedAndCompactly();
}

@Nullable
@Override
public T next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ public long returnedPosition() {
public Path filePath() {
return filePath;
}

@Override
public boolean vectorizedAndCompactly() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public Path filePath() {
return iterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return iterator.vectorizedAndCompactly();
}

@Nullable
@Override
public InternalRow next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ protected ParquetReaderBatch(
this.recycler = recycler;
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
filePath, new ColumnarRow(columnarBatch), this::recycle, false);
}

public void recycle() {
Expand Down
Loading