diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index e60c399020ea..d4c83e31a58e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -45,7 +45,6 @@ public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) { super(recycler); this.rowData = rowData; this.recycler = recycler; - this.rowPosition = 0; } /** Reset the number of rows in the vectorized batch and the start position in this batch. */ @@ -79,6 +78,13 @@ public long rowPosition() { return rowPosition; } + public ColumnarRowIterator copy(ColumnVector[] vectors) { + ColumnarRowIterator newIterator = new ColumnarRowIterator(rowData.copy(vectors), recycler); + newIterator.reset(num); + newIterator.resetRowPosition(rowPosition); + return newIterator; + } + public ColumnarRowIterator mapping( @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) { if (partitionInfo != null || indexMapping != null) { @@ -90,9 +96,7 @@ public ColumnarRowIterator mapping( if (indexMapping != null) { vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors); } - ColumnarRowIterator iterator = new ColumnarRowIterator(rowData.copy(vectors), recycler); - iterator.reset(num); - return iterator; + return copy(vectors); } return this; } diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java index 815661a0e5ed..318e4696f413 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java @@ -150,18 +150,12 @@ default void forEachRemaining(Consumer action) throws IOException { default void forEachRemainingWithPosition(BiConsumer action) throws IOException { RecordWithPositionIterator batch; - long rowPosition; T record; try { while ((batch = (RecordWithPositionIterator) readBatch()) != null) { - while (true) { - rowPosition = batch.rowPosition(); - record = batch.next(); - if (record == null) { - break; - } - action.accept(rowPosition, record); + while ((record = batch.next()) != null) { + action.accept(batch.rowPosition(), record); } batch.releaseBatch(); } diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java index 027416cd566a..66edaa87a058 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java @@ -33,8 +33,7 @@ public interface RecordWithPositionIterator extends RecordReader.RecordIterator { /** - * Get the row position of the row that will be returned by the following call to {@link - * RecordReader.RecordIterator#next}. + * Get the row position of the row called by {@link RecordReader.RecordIterator#next}. * * @return the row position from 0 to the number of rows in the file */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index bf8538bcdff5..1762d8ba9ff6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -183,7 +183,7 @@ private RecordIterator convertAndGetIterator( int batchSize = orcBatch.size; paimonColumnBatch.setNumRows(batchSize); result.reset(batchSize); - result.resetRowPosition(rowNumber); + result.resetRowPosition(rowNumber - 1); return result; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index ece3bbc7b5bd..2350c1e7b1a4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -400,7 +400,7 @@ public void recycle() { public RecordIterator convertAndGetIterator(long rowNumber) { result.reset(columnarBatch.getNumRows()); - result.resetRowPosition(rowNumber); + result.resetRowPosition(rowNumber - 1); return result; } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index 8b3a75230bf7..a5160f5c7990 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -197,6 +197,26 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException { } } + @Test + void testReadRowPositionWithTransformAndFilter() throws IOException { + int randomPooSize = new Random().nextInt(3) + 1; + OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}); + + try (RecordReader reader = + format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + reader.transform(row -> row) + .filter(row -> row.getInt(1) % 123 == 0) + .forEachRemainingWithPosition( + (rowPosition, row) -> { + // check row position + // Note: in flatFile, field _col0's value is row position + 1, we + // can use it + // to check row position + assertThat(rowPosition + 1).isEqualTo(row.getInt(1)); + }); + } + } + @Test void testReadDecimalTypeFile() throws IOException { OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});