Skip to content

Commit

Permalink
[core] Redefine the usage of RecordWithPositionIterator (#2953)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 6, 2024
1 parent d61ed7e commit 4791d66
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,45 +38,46 @@ public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
private final Runnable recycler;

private int num;
private int pos;
private long rowPosition;
private int nextPos;
private long nextGlobalPos;

public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) {
super(recycler);
this.rowData = rowData;
this.recycler = recycler;
this.rowPosition = 0;
}

/** Reset the number of rows in the vectorized batch and the start position in this batch. */
public void reset(int num) {
this.num = num;
this.pos = 0;
}

/**
* Reset the current record's row position in the file, and it needs to be ensured that the row
* position after reset is strictly incremented by 1.
* Reset the number of rows in the vectorized batch, the start position in this batch and the
* global position.
*/
public void resetRowPosition(long rowPosition) {
this.rowPosition = rowPosition;
public void reset(int num, long nextGlobalPos) {
this.num = num;
this.nextPos = 0;
this.nextGlobalPos = nextGlobalPos;
}

@Nullable
@Override
public InternalRow next() {
if (pos < num) {
rowData.setRowId(pos++);
rowPosition++;
if (nextPos < num) {
rowData.setRowId(nextPos++);
nextGlobalPos++;
return rowData;
} else {
return null;
}
}

@Override
public long rowPosition() {
return rowPosition;
public long returnedPosition() {
return nextGlobalPos - 1;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
newIterator.reset(num, nextGlobalPos);
return newIterator;
}

public ColumnarRowIterator mapping(
Expand All @@ -90,9 +91,7 @@ public ColumnarRowIterator mapping(
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
}
ColumnarRowIterator iterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
iterator.reset(num);
return iterator;
return copy(vectors);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,12 @@ default void forEachRemaining(Consumer<? super T> action) throws IOException {
default void forEachRemainingWithPosition(BiConsumer<Long, ? super T> action)
throws IOException {
RecordWithPositionIterator<T> batch;
long rowPosition;
T record;

try {
while ((batch = (RecordWithPositionIterator<T>) readBatch()) != null) {
while (true) {
rowPosition = batch.rowPosition();
record = batch.next();
if (record == null) {
break;
}
action.accept(rowPosition, record);
while ((record = batch.next()) != null) {
action.accept(batch.returnedPosition(), record);
}
batch.releaseBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@
public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterator<T> {

/**
* Get the row position of the row that will be returned by the following call to {@link
* RecordReader.RecordIterator#next}.
* Get the row position of the row returned by {@link RecordReader.RecordIterator#next}.
*
* @return the row position from 0 to the number of rows in the file
*/
long rowPosition();
long returnedPosition();

@Override
default <R> RecordWithPositionIterator<R> transform(Function<T, R> function) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<R>() {
@Override
public long rowPosition() {
return thisIterator.rowPosition();
public long returnedPosition() {
return thisIterator.returnedPosition();
}

@Nullable
Expand All @@ -71,8 +70,8 @@ default RecordWithPositionIterator<T> filter(Filter<T> filter) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<T>() {
@Override
public long rowPosition() {
return thisIterator.rowPosition();
public long returnedPosition() {
return thisIterator.returnedPosition();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ private RecordIterator<InternalRow> convertAndGetIterator(
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
result.reset(batchSize);
result.resetRowPosition(rowNumber);
result.reset(batchSize, rowNumber);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ public void recycle() {
}

public RecordIterator<InternalRow> convertAndGetIterator(long rowNumber) {
result.reset(columnarBatch.getNumRows());
result.resetRowPosition(rowNumber);
result.reset(columnarBatch.getNumRows(), rowNumber);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,26 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException {
}
}

@Test
void testReadRowPositionWithTransformAndFilter() throws IOException {
int randomPooSize = new Random().nextInt(3) + 1;
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});

try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
reader.transform(row -> row)
.filter(row -> row.getInt(1) % 123 == 0)
.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check row position
// Note: in flatFile, field _col0's value is row position + 1, we
// can use it
// to check row position
assertThat(rowPosition + 1).isEqualTo(row.getInt(1));
});
}
}

@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
Expand Down

0 comments on commit 4791d66

Please sign in to comment.