Skip to content

Commit

Permalink
[Arrow] Introduce VectorSchemaRootConverter to get vsr from data spli…
Browse files Browse the repository at this point in the history
…ts using ArrowBatchConverter
  • Loading branch information
yuzelin committed Dec 21, 2024
1 parent e596e27 commit f6a76a0
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow.converter;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.RowType;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;

/**
* Use Paimon API to read records from {@link DataSplit}s and convert to {@link VectorSchemaRoot}.
*/
public class VectorSchemaRootConverter {

private final RecordReader<InternalRow> recordReader;

// for users to override the #init()
protected final RowType readType;
protected BufferAllocator allocator;
protected final boolean vsrCaseSensitive;
protected ArrowVectorizedBatchConverter batchConverter;
protected ArrowPerRowBatchConverter perRowConverter;

private final boolean schemaContainsConstructedType;

private ArrowBatchConverter currentConverter;

private long readBytes = 0;

private boolean inited = false;

public VectorSchemaRootConverter(
ReadBuilder readBuilder,
RowType rowType,
@Nullable int[] projection,
@Nullable Predicate predicate,
List<DataSplit> splits,
boolean vsrCaseSensitive) {
this.readType = projection == null ? rowType : rowType.project(projection);

readBuilder.withProjection(projection);
if (predicate != null) {
readBuilder.withFilter(predicate);
}
TableRead read = readBuilder.newRead();
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
for (DataSplit split : splits) {
readerSuppliers.add(() -> read.createReader(split));
}
try {
this.recordReader = ConcatRecordReader.create(readerSuppliers);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

this.vsrCaseSensitive = vsrCaseSensitive;
this.schemaContainsConstructedType =
rowType.getFieldTypes().stream()
.anyMatch(dataType -> dataType.is(DataTypeFamily.CONSTRUCTED));
}

public void init() {
allocator = new RootAllocator();
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(readType, allocator, vsrCaseSensitive);
ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, readType);
batchConverter = new ArrowVectorizedBatchConverter(vsr, fieldWriters);
perRowConverter = new ArrowPerRowBatchConverter(vsr, fieldWriters);
}

/**
* Get {@link VectorSchemaRoot} of which row count is at most {@code maxBatchRows}. Return null
* if there is no more data.
*/
@Nullable
public VectorSchemaRoot next(int maxBatchRows) {
if (!inited) {
init();
inited = true;
}

if (currentConverter == null) {
resetCurrentConverter();
if (currentConverter == null) {
// no more data
return null;
}
}

VectorSchemaRoot vsr = currentConverter.next(maxBatchRows);
if (vsr == null) {
resetCurrentConverter();
if (currentConverter == null) {
// no more data
return null;
}
vsr = currentConverter.next(maxBatchRows);
}
if (vsr != null) {
readBytes += vsr.getFieldVectors().stream().mapToLong(ValueVector::getBufferSize).sum();
}
return vsr;
}

private void resetCurrentConverter() {
RecordReader.RecordIterator<InternalRow> iterator;
try {
iterator = recordReader.readBatch();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (iterator == null) {
// no more data
currentConverter = null;
return;
}

// TODO: delete this after #3883 is fixed completely.
boolean vectorizedAndCompactly =
((FileRecordIterator<InternalRow>) iterator).vectorizedAndCompactly();
boolean useVectorizedSafely = vectorizedAndCompactly || !schemaContainsConstructedType;

if (isVectorizedWithDv(iterator) && useVectorizedSafely) {
batchConverter.reset((ApplyDeletionFileRecordIterator) iterator);
currentConverter = batchConverter;
} else if (iterator instanceof VectorizedRecordIterator && useVectorizedSafely) {
batchConverter.reset((VectorizedRecordIterator) iterator);
currentConverter = batchConverter;
} else {
perRowConverter.reset(iterator);
currentConverter = perRowConverter;
}
}

public void close() throws IOException {
recordReader.close();
batchConverter.close();
perRowConverter.close();
allocator.close();
}

@VisibleForTesting
static boolean isVectorizedWithDv(RecordReader.RecordIterator<InternalRow> iterator) {
if (iterator instanceof ApplyDeletionFileRecordIterator) {
ApplyDeletionFileRecordIterator deletionIterator =
(ApplyDeletionFileRecordIterator) iterator;
FileRecordIterator<InternalRow> innerIterator = deletionIterator.iterator();
return innerIterator instanceof VectorizedRecordIterator;
}
return false;
}

public long getReadBytes() {
return readBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -888,7 +887,7 @@ private RecordReader.RecordIterator<InternalRow> getApplyDeletionFileRecordItera
.withProjection(projection)
.createReader(table.newReadBuilder().newScan().plan())
.readBatch();
assertThat(isVectorizedWithDv(iterator)).isTrue();
assertThat(VectorSchemaRootConverter.isVectorizedWithDv(iterator)).isTrue();
return iterator;
}

Expand Down Expand Up @@ -924,16 +923,6 @@ private ArrowBatchConverter createArrowWriter(
}
}

private boolean isVectorizedWithDv(RecordReader.RecordIterator<InternalRow> iterator) {
if (iterator instanceof ApplyDeletionFileRecordIterator) {
ApplyDeletionFileRecordIterator deletionIterator =
(ApplyDeletionFileRecordIterator) iterator;
FileRecordIterator<InternalRow> innerIterator = deletionIterator.iterator();
return innerIterator instanceof VectorizedRecordIterator;
}
return false;
}

private Object[] randomRowValues(boolean[] nullable) {
Object[] values = new Object[18];
// The orc char reader will trim the string. See TreeReaderFactory.CharTreeReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
private int nextPos;
private long nextFilePos;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
private final boolean vectorizedAndCompactly;

public ColumnarRowIterator(
Path filePath,
ColumnarRow row,
@Nullable Runnable recycler,
boolean vectorizedAndCompactly) {
super(recycler);
this.filePath = filePath;
this.row = row;
this.recycler = recycler;
this.vectorizedAndCompactly = vectorizedAndCompactly;
}

public void reset(long nextFilePos) {
Expand Down Expand Up @@ -79,9 +86,15 @@ public Path filePath() {
return this.filePath;
}

@Override
public boolean vectorizedAndCompactly() {
return vectorizedAndCompactly;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
new ColumnarRowIterator(
filePath, row.copy(vectors), recycler, vectorizedAndCompactly);
newIterator.reset(nextFilePos);
return newIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Path filePath() {
return iterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return iterator.vectorizedAndCompactly();
}

@Nullable
@Override
public InternalRow next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public interface FileRecordIterator<T> extends RecordReader.RecordIterator<T> {
/** @return the file path */
Path filePath();

/**
* Return true only if this FileRecordIterator contains a batch and the batch's nested vectors
* are compactly (See <a href="https://github.com/apache/paimon/pull/3883">Reverted #3883</a>).
* If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly,
* it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()}
* to handle it row by row.
*
* <p>TODO: delete this after #3883 is fixed completely.
*/
boolean vectorizedAndCompactly();

@Override
default <R> FileRecordIterator<R> transform(Function<T, R> function) {
FileRecordIterator<T> thisIterator = this;
Expand All @@ -56,6 +67,11 @@ public Path filePath() {
return thisIterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return thisIterator.vectorizedAndCompactly();
}

@Nullable
@Override
public R next() throws IOException {
Expand Down Expand Up @@ -87,6 +103,11 @@ public Path filePath() {
return thisIterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return thisIterator.vectorizedAndCompactly();
}

@Nullable
@Override
public T next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@ public final class IteratorResultIterator extends RecyclableIterator<InternalRow
private final Path filePath;
private long nextFilePos;

private final boolean vectorizedAndCompactly;

public IteratorResultIterator(
final IteratorWithException<InternalRow, IOException> records,
final @Nullable Runnable recycler,
final Path filePath,
long pos) {
long pos,
boolean vectorizedAndCompactly) {
super(recycler);
this.records = records;
this.filePath = filePath;
this.nextFilePos = pos;
this.vectorizedAndCompactly = vectorizedAndCompactly;
}

@Nullable
Expand All @@ -66,4 +70,9 @@ public long returnedPosition() {
public Path filePath() {
return filePath;
}

@Override
public boolean vectorizedAndCompactly() {
return vectorizedAndCompactly;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public Path filePath() {
return iterator.filePath();
}

@Override
public boolean vectorizedAndCompactly() {
return iterator.vectorizedAndCompactly();
}

@Nullable
@Override
public InternalRow next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public IteratorResultIterator readBatch() throws IOException {
IteratorWithException<InternalRow, IOException> iterator =
new AvroBlockIterator(reader.getBlockCount(), reader);
return new IteratorResultIterator(
iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition);
iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition, false);
}

private boolean readNextBlock() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected OrcReaderBatch(
this.paimonColumnBatch = paimonColumnBatch;
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(paimonColumnBatch), this::recycle);
filePath, new ColumnarRow(paimonColumnBatch), this::recycle, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ protected ParquetReaderBatch(
this.recycler = recycler;
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
filePath, new ColumnarRow(columnarBatch), this::recycle, false);
}

public void recycle() {
Expand Down

0 comments on commit f6a76a0

Please sign in to comment.