Skip to content

Commit

Permalink
fix record with position iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Mar 6, 2024
1 parent ed322ce commit 79f5273
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) {
super(recycler);
this.rowData = rowData;
this.recycler = recycler;
this.rowPosition = 0;
}

/** Reset the number of rows in the vectorized batch and the start position in this batch. */
Expand Down Expand Up @@ -79,6 +78,13 @@ public long rowPosition() {
return rowPosition;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
newIterator.reset(num);
newIterator.resetRowPosition(rowPosition);
return newIterator;
}

public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
if (partitionInfo != null || indexMapping != null) {
Expand All @@ -90,9 +96,7 @@ public ColumnarRowIterator mapping(
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
}
ColumnarRowIterator iterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
iterator.reset(num);
return iterator;
return copy(vectors);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,12 @@ default void forEachRemaining(Consumer<? super T> action) throws IOException {
default void forEachRemainingWithPosition(BiConsumer<Long, ? super T> action)
throws IOException {
RecordWithPositionIterator<T> batch;
long rowPosition;
T record;

try {
while ((batch = (RecordWithPositionIterator<T>) readBatch()) != null) {
while (true) {
rowPosition = batch.rowPosition();
record = batch.next();
if (record == null) {
break;
}
action.accept(rowPosition, record);
while ((record = batch.next()) != null) {
action.accept(batch.rowPosition(), record);
}
batch.releaseBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterator<T> {

/**
* Get the row position of the row that will be returned by the following call to {@link
* RecordReader.RecordIterator#next}.
* Get the row position of the row called by {@link RecordReader.RecordIterator#next}.
*
* @return the row position from 0 to the number of rows in the file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private RecordIterator<InternalRow> convertAndGetIterator(
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
result.reset(batchSize);
result.resetRowPosition(rowNumber);
result.resetRowPosition(rowNumber - 1);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public void recycle() {

public RecordIterator<InternalRow> convertAndGetIterator(long rowNumber) {
result.reset(columnarBatch.getNumRows());
result.resetRowPosition(rowNumber);
result.resetRowPosition(rowNumber - 1);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,26 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException {
}
}

@Test
void testReadRowPositionWithTransformAndFilter() throws IOException {
int randomPooSize = new Random().nextInt(3) + 1;
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});

try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
reader.transform(row -> row)
.filter(row -> row.getInt(1) % 123 == 0)
.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check row position
// Note: in flatFile, field _col0's value is row position + 1, we
// can use it
// to check row position
assertThat(rowPosition + 1).isEqualTo(row.getInt(1));
});
}
}

@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
Expand Down

0 comments on commit 79f5273

Please sign in to comment.