From d33b8711fc6b4e1f35ba7d85336be4ff3baa956d Mon Sep 17 00:00:00 2001 From: "aiden.dong" <782112163@qq.com> Date: Tue, 3 Dec 2024 13:33:44 +0800 Subject: [PATCH] [core] Optimization of Parquet Predicate Pushdown Capability (#4608) --- .../table/PrimaryKeyFileStoreTableTest.java | 63 ++++++ .../format/parquet/ParquetReaderFactory.java | 66 +++++- .../parquet/reader/AbstractColumnReader.java | 204 +++++++++++++----- .../parquet/reader/BooleanColumnReader.java | 36 +++- .../parquet/reader/ByteColumnReader.java | 39 +++- .../parquet/reader/BytesColumnReader.java | 41 +++- .../parquet/reader/DoubleColumnReader.java | 38 +++- .../reader/FixedLenBytesColumnReader.java | 36 +++- .../parquet/reader/FloatColumnReader.java | 38 +++- .../parquet/reader/IntColumnReader.java | 39 +++- .../parquet/reader/LongColumnReader.java | 39 +++- .../parquet/reader/NestedColumnReader.java | 2 +- .../reader/NestedPrimitiveColumnReader.java | 141 +++++++----- .../parquet/reader/ParquetReadState.java | 148 +++++++++++++ .../reader/ParquetSplitReaderUtil.java | 41 ++-- .../parquet/reader/RunLengthDecoder.java | 45 ++++ .../parquet/reader/ShortColumnReader.java | 38 +++- .../parquet/reader/TimestampColumnReader.java | 15 +- 18 files changed, 898 insertions(+), 171 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 46b85223bc2f..e80b49a0f05d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -84,6 +84,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -809,6 +810,68 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception { "1|4|500|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testDeletionVectorsWithParquetFilter() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(FILE_FORMAT, "parquet"); + conf.set("parquet.block.size", "1048576"); + conf.set("parquet.page.size", "1024"); + }); + + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + + BatchTableWrite write = + (BatchTableWrite) + writeBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + + for (int i = 0; i < 200000; i++) { + write.write(rowData(1, i, i * 100L)); + } + + List messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + write = + (BatchTableWrite) + writeBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + for (int i = 180000; i < 200000; i++) { + write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); + } + + messages = write.prepareCommit(); + commit = writeBuilder.newCommit(); + commit.commit(messages); + + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + Random random = new Random(); + + for (int i = 0; i < 10; i++) { + int value = random.nextInt(180000); + TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); + assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) + .isEqualTo( + Arrays.asList( + String.format( + "%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset", + 1, value, value * 100L))); + } + + for (int i = 0; i < 10; i++) { + int value = 180000 + random.nextInt(20000); + TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); + assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty(); + } + } + @Test public void testDeletionVectorsWithFileIndexInMeta() throws Exception { FileStoreTable table = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index f0151d6f3d8f..0c996531201a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; import org.apache.paimon.format.parquet.reader.ParquetDecimalVector; +import org.apache.paimon.format.parquet.reader.ParquetReadState; import org.apache.paimon.format.parquet.reader.ParquetTimestampVector; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.fs.Path; @@ -130,7 +131,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO); return new ParquetReader( - reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields); + reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields); } private void setReadOptions(ParquetReadOptions.Builder builder) { @@ -336,6 +337,10 @@ private class ParquetReader implements FileRecordReader { private long nextRowPosition; + private ParquetReadState currentRowGroupReadState; + + private long currentRowGroupFirstRowIndex; + /** * For each request column, the reader to read this column. This is NULL if this column is * missing from the file, in which case we populate the attribute with NULL. @@ -359,6 +364,7 @@ private ParquetReader( this.totalCountLoadedSoFar = 0; this.currentRowPosition = 0; this.nextRowPosition = 0; + this.currentRowGroupFirstRowIndex = 0; this.fields = fields; } @@ -390,7 +396,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { currentRowPosition = nextRowPosition; } - int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned); + int num = getBachSize(); + for (int i = 0; i < columnReaders.length; ++i) { if (columnReaders[i] == null) { batch.writableVectors[i].fillWithNulls(); @@ -400,13 +407,13 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { } } rowsReturned += num; - nextRowPosition = currentRowPosition + num; + nextRowPosition = getNextRowPosition(num); batch.columnarBatch.setNumRows(num); return true; } private void readNextRowGroup() throws IOException { - PageReadStore rowGroup = reader.readNextRowGroup(); + PageReadStore rowGroup = reader.readNextFilteredRowGroup(); if (rowGroup == null) { throw new IOException( "expecting more rows but reached last block. Read " @@ -415,6 +422,9 @@ private void readNextRowGroup() throws IOException { + totalRowCount); } + this.currentRowGroupReadState = + new ParquetReadState(rowGroup.getRowIndexes().orElse(null)); + List types = requestedSchema.getFields(); columnReaders = new ColumnReader[types.size()]; for (int i = 0; i < types.size(); ++i) { @@ -429,18 +439,62 @@ private void readNextRowGroup() throws IOException { 0); } } + totalCountLoadedSoFar += rowGroup.getRowCount(); - if (rowGroup.getRowIndexOffset().isPresent()) { - currentRowPosition = rowGroup.getRowIndexOffset().get(); + + if (rowGroup.getRowIndexOffset().isPresent()) { // filter + currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get(); + long pageIndex = 0; + if (!this.currentRowGroupReadState.isMaxRange()) { + pageIndex = this.currentRowGroupReadState.currentRangeStart(); + } + currentRowPosition = currentRowGroupFirstRowIndex + pageIndex; } else { if (reader.rowGroupsFiltered()) { throw new RuntimeException( "There is a bug, rowIndexOffset must be present when row groups are filtered."); } + currentRowGroupFirstRowIndex = nextRowPosition; currentRowPosition = nextRowPosition; } } + private int getBachSize() throws IOException { + + long rangeBatchSize = Long.MAX_VALUE; + if (this.currentRowGroupReadState.isFinished()) { + throw new IOException( + "expecting more rows but reached last page block. Read " + + rowsReturned + + " out of " + + totalRowCount); + } else if (!this.currentRowGroupReadState.isMaxRange()) { + long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex; + rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1; + } + + return (int) + Math.min( + batchSize, + Math.min(rangeBatchSize, totalCountLoadedSoFar - rowsReturned)); + } + + private long getNextRowPosition(int num) { + if (this.currentRowGroupReadState.isMaxRange()) { + return this.currentRowPosition + num; + } else { + long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex; + long nextIndex = pageIndex + num; + + if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) { + this.currentRowGroupReadState.nextRange(); + nextIndex = this.currentRowGroupReadState.currentRangeStart(); + } + + return nextIndex; + } + } + private ParquetReaderBatch getCachedEntry() throws IOException { try { return pool.pollEntry(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java index 7e2ab6d5e7f0..5e3f4a7e6a33 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java @@ -32,6 +32,7 @@ import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -65,20 +66,16 @@ public abstract class AbstractColumnReader protected final ColumnDescriptor descriptor; - /** Total number of values read. */ - private long valuesRead; - - /** - * value that indicates the end of the current page. That is, if valuesRead == - * endOfPageValueCount, we are at the end of the page. - */ - private long endOfPageValueCount; - /** If true, the current page is dictionary encoded. */ private boolean isCurrentPageDictionaryEncoded; /** Total values in the current page. */ - private int pageValueCount; + // private int pageValueCount; + + /** + * Helper struct to track intermediate states while reading Parquet pages in the column chunk. + */ + private final ParquetReadState readState; /* * Input streams: @@ -101,12 +98,14 @@ public abstract class AbstractColumnReader /** Dictionary decoder to wrap dictionary ids input stream. */ private RunLengthDecoder dictionaryIdsDecoder; - public AbstractColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public AbstractColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { this.descriptor = descriptor; - this.pageReader = pageReader; + this.pageReader = pageReadStore.getPageReader(descriptor); this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.readState = new ParquetReadState(pageReadStore.getRowIndexes().orElse(null)); + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); if (dictionaryPage != null) { try { @@ -147,56 +146,136 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException if (dictionary != null) { dictionaryIds = vector.reserveDictionaryIds(readNumber); } - while (readNumber > 0) { + + readState.resetForNewBatch(readNumber); + + while (readState.rowsToReadInBatch > 0) { // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - DataPage page = pageReader.readPage(); - if (page instanceof DataPageV1) { - readPageV1((DataPageV1) page); - } else if (page instanceof DataPageV2) { - readPageV2((DataPageV2) page); - } else { - throw new RuntimeException("Unsupported page type: " + page.getClass()); + if (readState.valuesToReadInPage == 0) { + int pageValueCount = readPage(); + if (pageValueCount < 0) { + // we've read all the pages; this could happen when we're reading a repeated + // list and we + // don't know where the list will end until we've seen all the pages. + break; } - leftInPage = (int) (endOfPageValueCount - valuesRead); } - int num = Math.min(readNumber, leftInPage); - if (isCurrentPageDictionaryEncoded) { - // Read and decode dictionary ids. - runLenDecoder.readDictionaryIds( - num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder); - - if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) { - // Column vector supports lazy decoding of dictionary values so just set the - // dictionary. - // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. - // some - // non-dictionary encoded values have already been added). - vector.setDictionary(new ParquetDictionary(dictionary)); + + if (readState.isFinished()) { + break; + } + + long pageRowId = readState.rowId; + int leftInBatch = readState.rowsToReadInBatch; + int leftInPage = readState.valuesToReadInPage; + + int readBatch = Math.min(leftInBatch, leftInPage); + + long rangeStart = readState.currentRangeStart(); + long rangeEnd = readState.currentRangeEnd(); + + if (pageRowId < rangeStart) { + int toSkip = (int) (rangeStart - pageRowId); + if (toSkip >= leftInPage) { // drop page + pageRowId += leftInPage; + leftInPage = 0; } else { - readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); + if (isCurrentPageDictionaryEncoded) { + runLenDecoder.skipDictionaryIds( + toSkip, maxDefLevel, this.dictionaryIdsDecoder); + pageRowId += toSkip; + leftInPage -= toSkip; + } else { + skipBatch(toSkip); + pageRowId += toSkip; + leftInPage -= toSkip; + } } + } else if (pageRowId > rangeEnd) { + readState.nextRange(); } else { - if (vector.hasDictionary() && rowId != 0) { - // This batch already has dictionary encoded values but this new page is not. - // The batch - // does not support a mix of dictionary and not so we will decode the - // dictionary. - readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds()); + long start = pageRowId; + long end = Math.min(rangeEnd, pageRowId + readBatch - 1); + int num = (int) (end - start + 1); + + if (isCurrentPageDictionaryEncoded) { + // Read and decode dictionary ids. + runLenDecoder.readDictionaryIds( + num, + dictionaryIds, + vector, + rowId, + maxDefLevel, + this.dictionaryIdsDecoder); + + if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) { + // Column vector supports lazy decoding of dictionary values so just set the + // dictionary. + // We can't do this if rowId != 0 AND the column doesn't have a dictionary + // (i.e. + // some + // non-dictionary encoded values have already been added). + vector.setDictionary(new ParquetDictionary(dictionary)); + } else { + readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); + } + } else { + if (vector.hasDictionary() && rowId != 0) { + // This batch already has dictionary encoded values but this new page is + // not. + // The batch + // does not support a mix of dictionary and not so we will decode the + // dictionary. + readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds()); + } + vector.setDictionary(null); + readBatch(rowId, num, vector); } - vector.setDictionary(null); - readBatch(rowId, num, vector); + leftInBatch -= num; + pageRowId += num; + leftInPage -= num; + rowId += num; } + readState.rowsToReadInBatch = leftInBatch; + readState.valuesToReadInPage = leftInPage; + readState.rowId = pageRowId; + } + } - valuesRead += num; - rowId += num; - readNumber -= num; + private int readPage() { + DataPage page = pageReader.readPage(); + if (page == null) { + return -1; } + long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); + + int pageValueCount = + page.accept( + new DataPage.Visitor() { + @Override + public Integer visit(DataPageV1 dataPageV1) { + try { + return readPageV1(dataPageV1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer visit(DataPageV2 dataPageV2) { + try { + return readPageV2(dataPageV2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + readState.resetForNewPage(pageValueCount, pageFirstRowIndex); + return pageValueCount; } - private void readPageV1(DataPageV1 page) throws IOException { - this.pageValueCount = page.getValueCount(); + private int readPageV1(DataPageV1 page) throws IOException { + int pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); // Initialize the decoders. @@ -211,30 +290,31 @@ private void readPageV1(DataPageV1 page) throws IOException { ByteBufferInputStream in = bytes.toInputStream(); rlReader.initFromPage(pageValueCount, in); this.runLenDecoder.initFromStream(pageValueCount, in); - prepareNewPage(page.getValueEncoding(), in); + prepareNewPage(page.getValueEncoding(), in, pageValueCount); + return pageValueCount; } catch (IOException e) { throw new IOException("could not read page " + page + " in col " + descriptor, e); } } - private void readPageV2(DataPageV2 page) throws IOException { - this.pageValueCount = page.getValueCount(); + private int readPageV2(DataPageV2 page) throws IOException { + int pageValueCount = page.getValueCount(); int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); // do not read the length from the stream. v2 pages handle dividing the page bytes. this.runLenDecoder = new RunLengthDecoder(bitWidth, false); this.runLenDecoder.initFromStream( - this.pageValueCount, page.getDefinitionLevels().toInputStream()); + pageValueCount, page.getDefinitionLevels().toInputStream()); try { - prepareNewPage(page.getDataEncoding(), page.getData().toInputStream()); + prepareNewPage(page.getDataEncoding(), page.getData().toInputStream(), pageValueCount); + return pageValueCount; } catch (IOException e) { throw new IOException("could not read page " + page + " in col " + descriptor, e); } } - private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in) + private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in, int pageValueCount) throws IOException { - this.endOfPageValueCount = valuesRead + pageValueCount; if (dataEncoding.usesDictionary()) { if (dictionary == null) { throw new IOException( @@ -269,6 +349,14 @@ private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in) afterReadPage(); } + final void skipDataBuffer(int length) { + try { + dataInputStream.skipFully(length); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to skip " + length + " bytes", e); + } + } + final ByteBuffer readDataBuffer(int length) { try { return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN); @@ -291,6 +379,8 @@ protected boolean supportLazyDecode() { /** Read batch from {@link #runLenDecoder} and {@link #dataInputStream}. */ protected abstract void readBatch(int rowId, int num, VECTOR column); + protected abstract void skipBatch(int num); + /** * Decode dictionary ids to data. From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java index d5dc231d8436..83d3c5a07d4b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.PrimitiveType; @@ -36,9 +36,9 @@ public class BooleanColumnReader extends AbstractColumnReader 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + for (int i = 0; i < n; i++) { + readBoolean(); + } + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + readBoolean(); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + private boolean readBoolean() { if (bitOffset == 0) { try { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java index bed9923d9be3..804b8bc0275e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,8 +31,9 @@ /** Byte {@link ColumnReader}. Using INT32 to store byte, so just cast int to byte. */ public class ByteColumnReader extends AbstractColumnReader { - public ByteColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -69,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableByteVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipByte(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipByte(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipByte(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableByteVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java index e83115c8a69f..6ee395e58568 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Bytes {@link ColumnReader}. A int length and bytes data. */ public class BytesColumnReader extends AbstractColumnReader { - public BytesColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public BytesColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.BINARY); } @@ -70,6 +70,41 @@ protected void readBatch(int rowId, int num, WritableBytesVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipBinary(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipBinary(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipBinary(int num) { + for (int i = 0; i < num; i++) { + int len = readDataBuffer(4).getInt(); + skipDataBuffer(len); + } + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableBytesVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java index d6d8aa2bbb22..2cffd406248e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Double {@link ColumnReader}. */ public class DoubleColumnReader extends AbstractColumnReader { - public DoubleColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public DoubleColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE); } @@ -70,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableDoubleVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipDouble(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipDouble(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipDouble(int num) { + skipDataBuffer(8 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableDoubleVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java index afce717a6719..25e1b466e465 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java @@ -25,7 +25,7 @@ import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; @@ -39,8 +39,9 @@ public class FixedLenBytesColumnReader private final int precision; public FixedLenBytesColumnReader( - ColumnDescriptor descriptor, PageReader pageReader, int precision) throws IOException { - super(descriptor, pageReader); + ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); this.precision = precision; } @@ -79,6 +80,35 @@ protected void readBatch(int rowId, int num, VECTOR column) { } } + @Override + protected void skipBatch(int num) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } + } + + private void skipDataBinary(int len) { + skipDataBuffer(len); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java index 1f4adfa4b9c8..e9eec13df5fc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Float {@link ColumnReader}. */ public class FloatColumnReader extends AbstractColumnReader { - public FloatColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public FloatColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.FLOAT); } @@ -70,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableFloatVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipFloat(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipFloat(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipFloat(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableFloatVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java index e38e916d187e..521ad998f6f1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java @@ -21,7 +21,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -30,8 +30,9 @@ /** Int {@link ColumnReader}. */ public class IntColumnReader extends AbstractColumnReader { - public IntColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public IntColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -68,6 +69,38 @@ protected void readBatch(int rowId, int num, WritableIntVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipInteger(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipInteger(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipInteger(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableIntVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java index a8e04eae673a..c4af086a7026 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableLongVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,8 +31,9 @@ /** Long {@link ColumnReader}. */ public class LongColumnReader extends AbstractColumnReader { - public LongColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public LongColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); } @@ -69,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableLongVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipValue(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipValue(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipValue(int num) { + skipDataBuffer(num * 8); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableLongVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 68225fbd1320..8f20be275447 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -279,7 +279,7 @@ private Pair readPrimitive( reader = new NestedPrimitiveColumnReader( descriptor, - pages.getPageReader(descriptor), + pages, isUtcTimestamp, descriptor.getPrimitiveType(), field.getType(), diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 7d00ff79234a..7db7aedbf6ae 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -44,6 +44,7 @@ import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; @@ -82,15 +83,6 @@ public class NestedPrimitiveColumnReader implements ColumnReader valueList = new ArrayList<>(); + int valueIndex = collectDataFromParquetPage(readNumber, valueList); + + return fillColumnVector(valueIndex, valueList); + } + + private int collectDataFromParquetPage(int total, List valueList) throws IOException { + int valueIndex = 0; // repeated type need two loops to read data. - while (!eof && index < readNumber) { + + readState.resetForNewBatch(total); + + while (!eof && readState.rowsToReadInBatch > 0) { + + if (readState.isFinished()) { // finished to read + eof = true; + break; + } + + long pageRowId = readState.rowId; + long rangeStart = readState.currentRangeStart(); + long rangeEnd = readState.currentRangeEnd(); + + if (pageRowId > rangeEnd) { + readState.nextRange(); + continue; + } + + boolean needFilterSkip = pageRowId < rangeStart; + do { - if (!lastValue.shouldSkip) { + + if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); valueIndex++; } } while (readValue() && (repetitionLevel != 0)); - index++; + + if (pageRowId == readState.rowId) { + readState.rowId = readState.rowId + 1; + } + + if (!needFilterSkip) { + readState.rowsToReadInBatch = readState.rowsToReadInBatch - 1; + } } - return fillColumnVector(valueIndex, valueList); + return valueIndex; } public LevelDelegation getLevelDelegation() { @@ -255,20 +287,24 @@ private void readAndSaveRepetitionAndDefinitionLevels() { // get the values of repetition and definitionLevel repetitionLevel = repetitionLevelColumn.nextInt(); definitionLevel = definitionLevelColumn.nextInt(); - valuesRead++; + readState.valuesToReadInPage = readState.valuesToReadInPage - 1; repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); } private int readPageIfNeed() throws IOException { // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - // no data left in current page, load data from new page - readPage(); - leftInPage = (int) (endOfPageValueCount - valuesRead); + if (readState.valuesToReadInPage == 0) { + int pageValueCount = readPage(); + // 返回当前 page 的数据量 + if (pageValueCount < 0) { + // we've read all the pages; this could happen when we're reading a repeated list + // and we + // don't know where the list will end until we've seen all the pages. + return -1; + } } - return leftInPage; + return readState.valuesToReadInPage; } private Object readPrimitiveTypedRow(DataType category) { @@ -528,33 +564,36 @@ private static HeapBytesVector getHeapBytesVector(int total, List valueList) { return phbv; } - protected void readPage() { + protected int readPage() { DataPage page = pageReader.readPage(); if (page == null) { - return; + return -1; } - page.accept( - new DataPage.Visitor() { - @Override - public Void visit(DataPageV1 dataPageV1) { - readPageV1(dataPageV1); - return null; - } + long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); - @Override - public Void visit(DataPageV2 dataPageV2) { - readPageV2(dataPageV2); - return null; - } - }); + int pageValueCount = + page.accept( + new DataPage.Visitor() { + @Override + public Integer visit(DataPageV1 dataPageV1) { + return readPageV1(dataPageV1); + } + + @Override + public Integer visit(DataPageV2 dataPageV2) { + return readPageV2(dataPageV2); + } + }); + readState.resetForNewPage(pageValueCount, pageFirstRowIndex); + return pageValueCount; } private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) throws IOException { - this.pageValueCount = valueCount; - this.endOfPageValueCount = valuesRead + pageValueCount; + // this.pageValueCount = valueCount; + // this.endOfPageValueCount = valuesRead + pageValueCount; if (dataEncoding.usesDictionary()) { this.dataColumn = null; if (dictionary == null) { @@ -577,13 +616,14 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int } try { - dataColumn.initFromPage(pageValueCount, in); + dataColumn.initFromPage(valueCount, in); } catch (IOException e) { throw new IOException(String.format("Could not read page in col %s.", descriptor), e); } } - private void readPageV1(DataPageV1 page) { + private int readPageV1(DataPageV1 page) { + int pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); @@ -597,15 +637,16 @@ private void readPageV1(DataPageV1 page) { LOG.debug("Reading definition levels at {}.", in.position()); dlReader.initFromPage(pageValueCount, in); LOG.debug("Reading data at {}.", in.position()); - initDataReader(page.getValueEncoding(), in, page.getValueCount()); + initDataReader(page.getValueEncoding(), in, pageValueCount); + return pageValueCount; } catch (IOException e) { throw new ParquetDecodingException( String.format("Could not read page %s in col %s.", page, descriptor), e); } } - private void readPageV2(DataPageV2 page) { - this.pageValueCount = page.getValueCount(); + private int readPageV2(DataPageV2 page) { + int pageValueCount = page.getValueCount(); this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), page.getRepetitionLevels()); this.definitionLevelColumn = @@ -615,8 +656,8 @@ private void readPageV2(DataPageV2 page) { "Page data size {} bytes and {} records.", page.getData().size(), pageValueCount); - initDataReader( - page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount()); + initDataReader(page.getDataEncoding(), page.getData().toInputStream(), pageValueCount); + return pageValueCount; } catch (IOException e) { throw new ParquetDecodingException( String.format("Could not read page %s in col %s.", page, descriptor), e); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java new file mode 100644 index 000000000000..a6003676825a --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet.reader; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** Parquet reader state for column index. */ +public class ParquetReadState { + /** A special row range used when there is no row indexes (hence all rows must be included). */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + + /** + * A special row range used when the row indexes are present AND all the row ranges have been + * processed. This serves as a sentinel at the end indicating that all rows come after the last + * row range should be skipped. + */ + private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + private final Iterator rowRanges; + + private RowRange currentRange; + + /** row index for the next read. */ + long rowId; + + int valuesToReadInPage; + int rowsToReadInBatch; + + public ParquetReadState(PrimitiveIterator.OfLong rowIndexes) { + this.rowRanges = constructRanges(rowIndexes); + nextRange(); + } + + /** + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: `[0-2], + * [4-5], [7-9]`. + */ + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { + if (rowIndexes == null) { + return null; + } + + List rowRanges = new ArrayList<>(); + long currentStart = Long.MIN_VALUE; + long previous = Long.MIN_VALUE; + + while (rowIndexes.hasNext()) { + long idx = rowIndexes.nextLong(); + if (currentStart == Long.MIN_VALUE) { + currentStart = idx; + } else if (previous + 1 != idx) { + RowRange range = new RowRange(currentStart, previous); + rowRanges.add(range); + currentStart = idx; + } + previous = idx; + } + + if (previous != Long.MIN_VALUE) { + rowRanges.add(new RowRange(currentStart, previous)); + } + + return rowRanges.iterator(); + } + + /** Must be called at the beginning of reading a new batch. */ + void resetForNewBatch(int batchSize) { + this.rowsToReadInBatch = batchSize; + } + + /** Must be called at the beginning of reading a new page. */ + void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) { + this.valuesToReadInPage = totalValuesInPage; + this.rowId = pageFirstRowIndex; + } + + /** Returns the start index of the current row range. */ + public long currentRangeStart() { + return currentRange.start; + } + + /** Returns the end index of the current row range. */ + public long currentRangeEnd() { + return currentRange.end; + } + + public boolean isFinished() { + return this.currentRange.equals(this.END_ROW_RANGE); + } + + public boolean isMaxRange() { + return this.currentRange.equals(this.MAX_ROW_RANGE); + } + + public RowRange getCurrentRange() { + return currentRange; + } + + /** Advance to the next range. */ + public void nextRange() { + if (rowRanges == null) { + currentRange = MAX_ROW_RANGE; + } else if (!rowRanges.hasNext()) { + currentRange = END_ROW_RANGE; + } else { + currentRange = rowRanges.next(); + } + } + + /** Helper struct to represent a range of row indexes `[start, end]`. */ + public static class RowRange { + final long start; + final long end; + + RowRange(long start, long end) { + this.start = start; + this.end = end; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RowRange)) { + return false; + } + return ((RowRange) obj).start == this.start && ((RowRange) obj).end == this.end; + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index 860ec54fa88b..a2be77414d5a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -87,58 +87,45 @@ public static ColumnReader createColumnReader( getAllColumnDescriptorByType(depth, type, columnDescriptors); switch (fieldType.getTypeRoot()) { case BOOLEAN: - return new BooleanColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BooleanColumnReader(descriptors.get(0), pages); case TINYINT: - return new ByteColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new ByteColumnReader(descriptors.get(0), pages); case DOUBLE: - return new DoubleColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new DoubleColumnReader(descriptors.get(0), pages); case FLOAT: - return new FloatColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new FloatColumnReader(descriptors.get(0), pages); case INTEGER: case DATE: case TIME_WITHOUT_TIME_ZONE: - return new IntColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new IntColumnReader(descriptors.get(0), pages); case BIGINT: - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); case SMALLINT: - return new ShortColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new ShortColumnReader(descriptors.get(0), pages); case CHAR: case VARCHAR: case BINARY: case VARBINARY: - return new BytesColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BytesColumnReader(descriptors.get(0), pages); case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: if (descriptors.get(0).getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) { - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); } - return new TimestampColumnReader( - true, descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new TimestampColumnReader(true, descriptors.get(0), pages); case DECIMAL: switch (descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()) { case INT32: - return new IntColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new IntColumnReader(descriptors.get(0), pages); case INT64: - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); case BINARY: - return new BytesColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BytesColumnReader(descriptors.get(0), pages); case FIXED_LEN_BYTE_ARRAY: return new FixedLenBytesColumnReader( descriptors.get(0), - pages.getPageReader(descriptors.get(0)), + pages, ((DecimalType) fieldType).getPrecision()); } case ARRAY: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java index 2dd1655d571f..ebb8f28fa1ee 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java @@ -194,6 +194,51 @@ private void readDictionaryIdData(int total, WritableIntVector c, int rowId) { } } + void skipDictionaryIds(int total, int level, RunLengthDecoder data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) { + this.readNextGroup(); + } + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.skipDictionaryIdData(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + data.readInteger(); + } + } + break; + } + left -= n; + currentCount -= n; + } + } + + private void skipDictionaryIdData(int total) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) { + this.readNextGroup(); + } + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + break; + case PACKED: + currentBufferIdx += n; + break; + } + left -= n; + currentCount -= n; + } + } + /** Reads the next varint encoded int. */ private int readUnsignedVarInt() throws IOException { int value = 0; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java index 7b32232261a7..bdb2f401fa3f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableShortVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -30,9 +30,9 @@ /** Short {@link ColumnReader}. Using INT32 to store short, so just cast int to short. */ public class ShortColumnReader extends AbstractColumnReader { - public ShortColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public ShortColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -71,6 +71,38 @@ protected void readBatch(int rowId, int num, WritableShortVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipShot(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipShot(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipShot(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableShortVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java index 4a279ff90e15..8767173315c2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.columnar.writable.WritableTimestampVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; @@ -49,9 +49,9 @@ public class TimestampColumnReader extends AbstractColumnReader