Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 28, 2024
1 parent 6a2c24d commit 12b5de1
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordWithPositionIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;

Expand All @@ -30,36 +31,54 @@
* A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. The next row is set by
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow> {
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
implements RecordWithPositionIterator<InternalRow> {

private final ColumnarRow rowData;
private final Runnable recycler;

private int num;
private int pos;
private long rowPosition;

public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) {
super(recycler);
this.rowData = rowData;
this.recycler = recycler;
this.rowPosition = 0;
}

public void set(int num) {
/** Reset the number of rows in the vectorized batch and the start position in this batch. */
public void reset(int num) {
this.num = num;
this.pos = 0;
}

/**
* Reset the current record's row position in the file, and it needs to be ensured that the row
* position after reset is strictly incremented by 1.
*/
public void resetRowPosition(long rowPosition) {
this.rowPosition = rowPosition;
}

@Nullable
@Override
public InternalRow next() {
if (pos < num) {
rowData.setRowId(pos++);
rowPosition++;
return rowData;
} else {
return null;
}
}

@Override
public long rowPosition() {
return rowPosition;
}

public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
if (partitionInfo != null || indexMapping != null) {
Expand All @@ -72,7 +91,7 @@ public ColumnarRowIterator mapping(
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
}
ColumnarRowIterator iterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
iterator.set(num);
iterator.reset(num);
return iterator;
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.ConsumerWithIOException;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -142,6 +143,33 @@ default void forEachRemaining(Consumer<? super T> action) throws IOException {
}
}

/**
* Performs the given action for each remaining element with row position in {@link
* RecordReader} until all elements have been processed or the action throws an exception.
*/
default void forEachRemainingWithPosition(Consumer<? super Pair<Long, T>> action)
throws IOException {
RecordWithPositionIterator<T> batch;
long rowPosition;
T record;

try {
while ((batch = (RecordWithPositionIterator<T>) readBatch()) != null) {
while (true) {
rowPosition = batch.rowPosition();
record = batch.next();
if (record == null) {
break;
}
action.accept(Pair.of(rowPosition, record));
}
batch.releaseBatch();
}
} finally {
close();
}
}

/**
* Performs the given action for each remaining element in {@link RecordReader} until all
* elements have been processed or the action throws an exception.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

import org.apache.paimon.utils.Filter;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.function.Function;

/**
* Wrap {@link RecordReader.RecordIterator} to support returning the record's row position.
*
* @param <T> The type of the record.
*/
public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterator<T> {

/**
* Get the row position of the row that will be returned by the following call to {@link
* RecordReader.RecordIterator#next}.
*
* @return the row position from 0 to the number of rows in the file
*/
long rowPosition();

@Override
default <R> RecordWithPositionIterator<R> transform(Function<T, R> function) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<R>() {
@Override
public long rowPosition() {
return thisIterator.rowPosition();
}

@Nullable
@Override
public R next() throws IOException {
T next = thisIterator.next();
if (next == null) {
return null;
}
return function.apply(next);
}

@Override
public void releaseBatch() {
thisIterator.releaseBatch();
}
};
}

@Override
default RecordWithPositionIterator<T> filter(Filter<T> filter) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<T>() {
@Override
public long rowPosition() {
return thisIterator.rowPosition();
}

@Nullable
@Override
public T next() throws IOException {
while (true) {
T next = thisIterator.next();
if (next == null) {
return null;
}
if (filter.test(next)) {
return next;
}
}
}

@Override
public void releaseBatch() {
thisIterator.releaseBatch();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,14 @@ public VectorizedRowBatch orcVectorizedRowBatch() {
return orcVectorizedRowBatch;
}

private RecordIterator<InternalRow> convertAndGetIterator(VectorizedRowBatch orcBatch) {
private RecordIterator<InternalRow> convertAndGetIterator(
VectorizedRowBatch orcBatch, long rowNumber) {
// no copying from the ORC column vectors to the Paimon columns vectors necessary,
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
result.set(batchSize);
result.reset(batchSize);
result.resetRowPosition(rowNumber);
return result;
}
}
Expand Down Expand Up @@ -218,12 +220,13 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();

long rowNumber = orcReader.getRowNumber();
if (!nextBatch(orcReader, orcVectorBatch)) {
batch.recycle();
return null;
}

return batch.convertAndGetIterator(orcVectorBatch);
return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ public void recycle() {
}

public RecordIterator<InternalRow> convertAndGetIterator() {
result.set(columnarBatch.getNumRows());
result.reset(columnarBatch.getNumRows());
// Since we currently don't implement parquet's filter push down, row position is always
// increased from 0 by 1, so we do not need to reset row position.
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.paimon.utils.Projection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -40,6 +42,8 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -136,6 +140,69 @@ void testReadFileWithSelectFields() throws IOException {
assertThat(totalF0.get()).isEqualTo(1844737280400L);
}

@Test
void testReadRowPosition() throws IOException {
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});

AtomicInteger cnt = new AtomicInteger(0);
AtomicLong totalF0 = new AtomicLong(0);

try (RecordReader<InternalRow> reader = format.createReader(new LocalFileIO(), flatFile)) {
reader.forEachRemainingWithPosition(
pair -> {
long rowPosition = pair.getLeft();
InternalRow row = pair.getRight();

assertThat(row.isNullAt(0)).isFalse();
assertThat(row.isNullAt(1)).isFalse();
assertThat(row.isNullAt(2)).isFalse();
assertThat(row.getString(0).toString()).isNotNull();
totalF0.addAndGet(row.getInt(1));
assertThat(row.getString(2).toString()).isNotNull();
// check row position
assertThat(rowPosition).isEqualTo(cnt.get());
cnt.incrementAndGet();
});
}
// check that all rows have been read
assertThat(cnt.get()).isEqualTo(1920800);
assertThat(totalF0.get()).isEqualTo(1844737280400L);
}

@RepeatedTest(10)
void testReadRowPositionWithRandomFilterAndPool() throws IOException {
ArrayList<OrcFilters.Predicate> predicates = new ArrayList<>();
int randomStart = new Random().nextInt(1920800);
int randomPooSize = new Random().nextInt(3) + 1;
predicates.add(
new OrcFilters.Not(
new OrcFilters.LessThanEquals(
"_col0", PredicateLeaf.Type.LONG, randomStart)));
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}, predicates);

AtomicBoolean isFirst = new AtomicBoolean(true);

try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
reader.forEachRemainingWithPosition(
pair -> {
Long rowPosition = pair.getLeft();
InternalRow row = pair.getRight();

// check filter: _col0 > randomStart
// Note: the accuracy of filter is within flatFile's strip size
if (isFirst.get()) {
assertThat(randomStart - row.getInt(1)).isLessThan(5000);
isFirst.set(false);
}
// check row position
// Note: in flatFile, field _col0's value is row position + 1, we can use it
// to check row position
assertThat(rowPosition + 1).isEqualTo(row.getInt(1));
});
}
}

@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.InstantiationUtil;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -281,6 +282,40 @@ void testProjectionReadUnknownField(int rowGroupSize) throws IOException {
});
}

@RepeatedTest(10)
void testReadRowPosition() throws IOException {
int recordNumber = new Random().nextInt(10000) + 1;
int batchSize = new Random().nextInt(1000) + 1;
int rowGroupSize = new Random().nextInt(1000) + 1;
List<InternalRow> records = new ArrayList<>(recordNumber);
for (int i = 0; i < recordNumber; i++) {
Integer v = i;
records.add(newRow(v));
}

Path testPath = createTempParquetFile(folder, records, rowGroupSize);

DataType[] fieldTypes = new DataType[] {new DoubleType()};
ParquetReaderFactory format =
new ParquetReaderFactory(
new Options(),
RowType.builder().fields(fieldTypes, new String[] {"f7"}).build(),
batchSize);

AtomicInteger cnt = new AtomicInteger(0);
try (RecordReader<InternalRow> reader = format.createReader(new LocalFileIO(), testPath)) {
reader.forEachRemainingWithPosition(
pair -> {
long rowPosition = pair.getLeft();
InternalRow row = pair.getRight();
assertThat(row.getDouble(0)).isEqualTo(cnt.get());
// check row position
assertThat(rowPosition).isEqualTo(cnt.get());
cnt.incrementAndGet();
});
}
}

private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
throws IOException {
List<InternalRow> rows = records.stream().map(this::newRow).collect(Collectors.toList());
Expand Down

0 comments on commit 12b5de1

Please sign in to comment.