diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/VectorSchemaRootConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/VectorSchemaRootConverter.java new file mode 100644 index 000000000000..dc1b6b8e36bd --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/VectorSchemaRootConverter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow.converter; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.arrow.ArrowUtils; +import org.apache.paimon.arrow.writer.ArrowFieldWriter; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.ReaderSupplier; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.VectorizedRecordIterator; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataTypeFamily; +import org.apache.paimon.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Use Paimon API to read records from {@link DataSplit}s and convert to {@link VectorSchemaRoot}. + */ +public class VectorSchemaRootConverter { + + private final RecordReader recordReader; + + // for users to override the #init() + protected final RowType readType; + protected BufferAllocator allocator; + protected final boolean vsrCaseSensitive; + protected ArrowVectorizedBatchConverter batchConverter; + protected ArrowPerRowBatchConverter perRowConverter; + + private final boolean schemaContainsConstructedType; + + private ArrowBatchConverter currentConverter; + + private long readBytes = 0; + + private boolean inited = false; + + public VectorSchemaRootConverter( + ReadBuilder readBuilder, + RowType rowType, + @Nullable int[] projection, + @Nullable Predicate predicate, + List splits, + boolean vsrCaseSensitive) { + this.readType = projection == null ? rowType : rowType.project(projection); + + readBuilder.withProjection(projection); + if (predicate != null) { + readBuilder.withFilter(predicate); + } + TableRead read = readBuilder.newRead(); + List> readerSuppliers = new ArrayList<>(); + for (DataSplit split : splits) { + readerSuppliers.add(() -> read.createReader(split)); + } + try { + recordReader = ConcatRecordReader.create(readerSuppliers); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + this.vsrCaseSensitive = vsrCaseSensitive; + this.schemaContainsConstructedType = + rowType.getFieldTypes().stream() + .anyMatch(dataType -> dataType.is(DataTypeFamily.CONSTRUCTED)); + } + + public void init() { + allocator = new RootAllocator(); + VectorSchemaRoot vsr = + ArrowUtils.createVectorSchemaRoot(readType, allocator, vsrCaseSensitive); + ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, readType); + batchConverter = new ArrowVectorizedBatchConverter(vsr, fieldWriters); + perRowConverter = new ArrowPerRowBatchConverter(vsr, fieldWriters); + } + + /** + * Get {@link VectorSchemaRoot} of which row count is at most {@code maxBatchRows}. Return null + * if there is no more data. + */ + @Nullable + public VectorSchemaRoot next(int maxBatchRows) { + if (!inited) { + init(); + inited = true; + } + + if (currentConverter == null) { + resetCurrentConverter(); + if (currentConverter == null) { + // no more data + return null; + } + } + + VectorSchemaRoot vsr = currentConverter.next(maxBatchRows); + if (vsr == null) { + resetCurrentConverter(); + if (currentConverter == null) { + // no more data + return null; + } + vsr = currentConverter.next(maxBatchRows); + } + if (vsr != null) { + readBytes += vsr.getFieldVectors().stream().mapToLong(ValueVector::getBufferSize).sum(); + } + return vsr; + } + + private void resetCurrentConverter() { + RecordReader.RecordIterator iterator; + try { + iterator = recordReader.readBatch(); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (iterator == null) { + // no more data + currentConverter = null; + return; + } + + // TODO: delete this after #3883 is fixed completely. + boolean vectorizedAndCompactly = + ((FileRecordIterator) iterator).vectorizedAndCompactly(); + boolean useVectorizedSafely = vectorizedAndCompactly || !schemaContainsConstructedType; + + if (isVectorizedWithDv(iterator) && useVectorizedSafely) { + batchConverter.reset((ApplyDeletionFileRecordIterator) iterator); + currentConverter = batchConverter; + } else if (iterator instanceof VectorizedRecordIterator && useVectorizedSafely) { + batchConverter.reset((VectorizedRecordIterator) iterator); + currentConverter = batchConverter; + } else { + perRowConverter.reset(iterator); + currentConverter = perRowConverter; + } + } + + public void close() throws IOException { + recordReader.close(); + batchConverter.close(); + perRowConverter.close(); + allocator.close(); + } + + @VisibleForTesting + static boolean isVectorizedWithDv(RecordReader.RecordIterator iterator) { + if (iterator instanceof ApplyDeletionFileRecordIterator) { + ApplyDeletionFileRecordIterator deletionIterator = + (ApplyDeletionFileRecordIterator) iterator; + FileRecordIterator innerIterator = deletionIterator.iterator(); + return innerIterator instanceof VectorizedRecordIterator; + } + return false; + } + + public long getReadBytes() { + return readBytes; + } +} diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java index 96470b72eebf..064b6dd53728 100644 --- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java +++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java @@ -35,7 +35,6 @@ import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; -import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.VectorizedRecordIterator; import org.apache.paimon.schema.Schema; @@ -888,7 +887,7 @@ private RecordReader.RecordIterator getApplyDeletionFileRecordItera .withProjection(projection) .createReader(table.newReadBuilder().newScan().plan()) .readBatch(); - assertThat(isVectorizedWithDv(iterator)).isTrue(); + assertThat(VectorSchemaRootConverter.isVectorizedWithDv(iterator)).isTrue(); return iterator; } @@ -924,16 +923,6 @@ private ArrowBatchConverter createArrowWriter( } } - private boolean isVectorizedWithDv(RecordReader.RecordIterator iterator) { - if (iterator instanceof ApplyDeletionFileRecordIterator) { - ApplyDeletionFileRecordIterator deletionIterator = - (ApplyDeletionFileRecordIterator) iterator; - FileRecordIterator innerIterator = deletionIterator.iterator(); - return innerIterator instanceof VectorizedRecordIterator; - } - return false; - } - private Object[] randomRowValues(boolean[] nullable) { Object[] values = new Object[18]; // The orc char reader will trim the string. See TreeReaderFactory.CharTreeReader diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 874c22134864..0187b63c7c2c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -44,11 +44,18 @@ public class ColumnarRowIterator extends RecyclableIterator private int nextPos; private long nextFilePos; - public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) { + private final boolean vectorizedAndCompactly; + + public ColumnarRowIterator( + Path filePath, + ColumnarRow row, + @Nullable Runnable recycler, + boolean vectorizedAndCompactly) { super(recycler); this.filePath = filePath; this.row = row; this.recycler = recycler; + this.vectorizedAndCompactly = vectorizedAndCompactly; } public void reset(long nextFilePos) { @@ -79,9 +86,15 @@ public Path filePath() { return this.filePath; } + @Override + public boolean vectorizedAndCompactly() { + return vectorizedAndCompactly; + } + public ColumnarRowIterator copy(ColumnVector[] vectors) { ColumnarRowIterator newIterator = - new ColumnarRowIterator(filePath, row.copy(vectors), recycler); + new ColumnarRowIterator( + filePath, row.copy(vectors), recycler, vectorizedAndCompactly); newIterator.reset(nextFilePos); return newIterator; } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java index eec931d3e98f..962f89b1b745 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java @@ -53,6 +53,11 @@ public Path filePath() { return iterator.filePath(); } + @Override + public boolean vectorizedAndCompactly() { + return iterator.vectorizedAndCompactly(); + } + @Nullable @Override public InternalRow next() throws IOException { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java index 2d3c85f193dc..8de8f7f00c66 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java @@ -42,6 +42,17 @@ public interface FileRecordIterator extends RecordReader.RecordIterator { /** @return the file path */ Path filePath(); + /** + * Return true only if this FileRecordIterator contains a batch and the batch's nested vectors + * are compactly (See Reverted #3883). + * If a FileRecordIterator contains a batch and the batch's nested vectors are not compactly, + * it's not safe to use VectorizedColumnBatch directly. Currently, we should use {@link #next()} + * to handle it row by row. + * + *

TODO: delete this after #3883 is fixed completely. + */ + boolean vectorizedAndCompactly(); + @Override default FileRecordIterator transform(Function function) { FileRecordIterator thisIterator = this; @@ -56,6 +67,11 @@ public Path filePath() { return thisIterator.filePath(); } + @Override + public boolean vectorizedAndCompactly() { + return thisIterator.vectorizedAndCompactly(); + } + @Nullable @Override public R next() throws IOException { @@ -87,6 +103,11 @@ public Path filePath() { return thisIterator.filePath(); } + @Override + public boolean vectorizedAndCompactly() { + return thisIterator.vectorizedAndCompactly(); + } + @Nullable @Override public T next() throws IOException { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java index 57e6c4f6f737..71475fb291a8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java @@ -35,15 +35,19 @@ public final class IteratorResultIterator extends RecyclableIterator records, final @Nullable Runnable recycler, final Path filePath, - long pos) { + long pos, + boolean vectorizedAndCompactly) { super(recycler); this.records = records; this.filePath = filePath; this.nextFilePos = pos; + this.vectorizedAndCompactly = vectorizedAndCompactly; } @Nullable @@ -66,4 +70,9 @@ public long returnedPosition() { public Path filePath() { return filePath; } + + @Override + public boolean vectorizedAndCompactly() { + return vectorizedAndCompactly; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java index 69997ab2eda5..3177c86fdc8a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java @@ -56,6 +56,11 @@ public Path filePath() { return iterator.filePath(); } + @Override + public boolean vectorizedAndCompactly() { + return iterator.vectorizedAndCompactly(); + } + @Nullable @Override public InternalRow next() throws IOException { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index a06ca9948c44..a48a30a953b0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -110,7 +110,7 @@ public IteratorResultIterator readBatch() throws IOException { IteratorWithException iterator = new AvroBlockIterator(reader.getBlockCount(), reader); return new IteratorResultIterator( - iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); + iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition, false); } private boolean readNextBlock() throws IOException { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index db17357bfd70..effe813882ac 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -170,7 +170,7 @@ protected OrcReaderBatch( this.paimonColumnBatch = paimonColumnBatch; this.result = new ColumnarRowIterator( - filePath, new ColumnarRow(paimonColumnBatch), this::recycle); + filePath, new ColumnarRow(paimonColumnBatch), this::recycle, true); } /** diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 910f3031e09f..b59adfa81f96 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -540,7 +540,7 @@ protected ParquetReaderBatch( this.recycler = recycler; this.result = new ColumnarRowIterator( - filePath, new ColumnarRow(columnarBatch), this::recycle); + filePath, new ColumnarRow(columnarBatch), this::recycle, false); } public void recycle() {