From 12b5de11bc4e7dce7e073c146af9033af3c776c3 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 28 Feb 2024 15:00:17 +0800 Subject: [PATCH] 1 --- .../data/columnar/ColumnarRowIterator.java | 25 ++++- .../apache/paimon/reader/RecordReader.java | 28 ++++++ .../reader/RecordWithPositionIterator.java | 98 +++++++++++++++++++ .../paimon/format/orc/OrcReaderFactory.java | 9 +- .../format/parquet/ParquetReaderFactory.java | 4 +- .../format/orc/OrcReaderFactoryTest.java | 67 +++++++++++++ .../format/parquet/ParquetReadWriteTest.java | 35 +++++++ 7 files changed, 259 insertions(+), 7 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 5283f55f8ea1..e60c399020ea 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordWithPositionIterator; import org.apache.paimon.utils.RecyclableIterator; import org.apache.paimon.utils.VectorMappingUtils; @@ -30,36 +31,54 @@ * A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. The next row is set by * {@link ColumnarRow#setRowId}. */ -public class ColumnarRowIterator extends RecyclableIterator { +public class ColumnarRowIterator extends RecyclableIterator + implements RecordWithPositionIterator { private final ColumnarRow rowData; private final Runnable recycler; private int num; private int pos; + private long rowPosition; public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) { super(recycler); this.rowData = rowData; this.recycler = recycler; + this.rowPosition = 0; } - public void set(int num) { + /** Reset the number of rows in the vectorized batch and the start position in this batch. */ + public void reset(int num) { this.num = num; this.pos = 0; } + /** + * Reset the current record's row position in the file, and it needs to be ensured that the row + * position after reset is strictly incremented by 1. + */ + public void resetRowPosition(long rowPosition) { + this.rowPosition = rowPosition; + } + @Nullable @Override public InternalRow next() { if (pos < num) { rowData.setRowId(pos++); + rowPosition++; return rowData; } else { return null; } } + @Override + public long rowPosition() { + return rowPosition; + } + public ColumnarRowIterator mapping( @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) { if (partitionInfo != null || indexMapping != null) { @@ -72,7 +91,7 @@ public ColumnarRowIterator mapping( vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors); } ColumnarRowIterator iterator = new ColumnarRowIterator(rowData.copy(vectors), recycler); - iterator.set(num); + iterator.reset(num); return iterator; } return this; diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java index 79fab867d34c..435e1ce5ca0b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java @@ -22,6 +22,7 @@ import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.ConsumerWithIOException; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; @@ -142,6 +143,33 @@ default void forEachRemaining(Consumer action) throws IOException { } } + /** + * Performs the given action for each remaining element with row position in {@link + * RecordReader} until all elements have been processed or the action throws an exception. + */ + default void forEachRemainingWithPosition(Consumer> action) + throws IOException { + RecordWithPositionIterator batch; + long rowPosition; + T record; + + try { + while ((batch = (RecordWithPositionIterator) readBatch()) != null) { + while (true) { + rowPosition = batch.rowPosition(); + record = batch.next(); + if (record == null) { + break; + } + action.accept(Pair.of(rowPosition, record)); + } + batch.releaseBatch(); + } + } finally { + close(); + } + } + /** * Performs the given action for each remaining element in {@link RecordReader} until all * elements have been processed or the action throws an exception. diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java new file mode 100644 index 000000000000..027416cd566a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import org.apache.paimon.utils.Filter; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.function.Function; + +/** + * Wrap {@link RecordReader.RecordIterator} to support returning the record's row position. + * + * @param The type of the record. + */ +public interface RecordWithPositionIterator extends RecordReader.RecordIterator { + + /** + * Get the row position of the row that will be returned by the following call to {@link + * RecordReader.RecordIterator#next}. + * + * @return the row position from 0 to the number of rows in the file + */ + long rowPosition(); + + @Override + default RecordWithPositionIterator transform(Function function) { + RecordWithPositionIterator thisIterator = this; + return new RecordWithPositionIterator() { + @Override + public long rowPosition() { + return thisIterator.rowPosition(); + } + + @Nullable + @Override + public R next() throws IOException { + T next = thisIterator.next(); + if (next == null) { + return null; + } + return function.apply(next); + } + + @Override + public void releaseBatch() { + thisIterator.releaseBatch(); + } + }; + } + + @Override + default RecordWithPositionIterator filter(Filter filter) { + RecordWithPositionIterator thisIterator = this; + return new RecordWithPositionIterator() { + @Override + public long rowPosition() { + return thisIterator.rowPosition(); + } + + @Nullable + @Override + public T next() throws IOException { + while (true) { + T next = thisIterator.next(); + if (next == null) { + return null; + } + if (filter.test(next)) { + return next; + } + } + } + + @Override + public void releaseBatch() { + thisIterator.releaseBatch(); + } + }; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 0e2a0308edd0..bf8538bcdff5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -176,12 +176,14 @@ public VectorizedRowBatch orcVectorizedRowBatch() { return orcVectorizedRowBatch; } - private RecordIterator convertAndGetIterator(VectorizedRowBatch orcBatch) { + private RecordIterator convertAndGetIterator( + VectorizedRowBatch orcBatch, long rowNumber) { // no copying from the ORC column vectors to the Paimon columns vectors necessary, // because they point to the same data arrays internally design int batchSize = orcBatch.size; paimonColumnBatch.setNumRows(batchSize); - result.set(batchSize); + result.reset(batchSize); + result.resetRowPosition(rowNumber); return result; } } @@ -218,12 +220,13 @@ public RecordIterator readBatch() throws IOException { final OrcReaderBatch batch = getCachedEntry(); final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch(); + long rowNumber = orcReader.getRowNumber(); if (!nextBatch(orcReader, orcVectorBatch)) { batch.recycle(); return null; } - return batch.convertAndGetIterator(orcVectorBatch); + return batch.convertAndGetIterator(orcVectorBatch, rowNumber); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 5f64da4e79ca..565cf3a664c7 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -393,7 +393,9 @@ public void recycle() { } public RecordIterator convertAndGetIterator() { - result.set(columnarBatch.getNumRows()); + result.reset(columnarBatch.getNumRows()); + // Since we currently don't implement parquet's filter push down, row position is always + // increased from 0 by 1, so we do not need to reset row position. return result; } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index 3ffbaca0752a..223462aab380 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -31,7 +31,9 @@ import org.apache.paimon.utils.Projection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -40,6 +42,8 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -136,6 +140,69 @@ void testReadFileWithSelectFields() throws IOException { assertThat(totalF0.get()).isEqualTo(1844737280400L); } + @Test + void testReadRowPosition() throws IOException { + OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}); + + AtomicInteger cnt = new AtomicInteger(0); + AtomicLong totalF0 = new AtomicLong(0); + + try (RecordReader reader = format.createReader(new LocalFileIO(), flatFile)) { + reader.forEachRemainingWithPosition( + pair -> { + long rowPosition = pair.getLeft(); + InternalRow row = pair.getRight(); + + assertThat(row.isNullAt(0)).isFalse(); + assertThat(row.isNullAt(1)).isFalse(); + assertThat(row.isNullAt(2)).isFalse(); + assertThat(row.getString(0).toString()).isNotNull(); + totalF0.addAndGet(row.getInt(1)); + assertThat(row.getString(2).toString()).isNotNull(); + // check row position + assertThat(rowPosition).isEqualTo(cnt.get()); + cnt.incrementAndGet(); + }); + } + // check that all rows have been read + assertThat(cnt.get()).isEqualTo(1920800); + assertThat(totalF0.get()).isEqualTo(1844737280400L); + } + + @RepeatedTest(10) + void testReadRowPositionWithRandomFilterAndPool() throws IOException { + ArrayList predicates = new ArrayList<>(); + int randomStart = new Random().nextInt(1920800); + int randomPooSize = new Random().nextInt(3) + 1; + predicates.add( + new OrcFilters.Not( + new OrcFilters.LessThanEquals( + "_col0", PredicateLeaf.Type.LONG, randomStart))); + OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}, predicates); + + AtomicBoolean isFirst = new AtomicBoolean(true); + + try (RecordReader reader = + format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + reader.forEachRemainingWithPosition( + pair -> { + Long rowPosition = pair.getLeft(); + InternalRow row = pair.getRight(); + + // check filter: _col0 > randomStart + // Note: the accuracy of filter is within flatFile's strip size + if (isFirst.get()) { + assertThat(randomStart - row.getInt(1)).isLessThan(5000); + isFirst.set(false); + } + // check row position + // Note: in flatFile, field _col0's value is row position + 1, we can use it + // to check row position + assertThat(rowPosition + 1).isEqualTo(row.getInt(1)); + }); + } + } + @Test void testReadDecimalTypeFile() throws IOException { OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0}); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 6542af029f3a..bf86b9da7e28 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -48,6 +48,7 @@ import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.InstantiationUtil; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -281,6 +282,40 @@ void testProjectionReadUnknownField(int rowGroupSize) throws IOException { }); } + @RepeatedTest(10) + void testReadRowPosition() throws IOException { + int recordNumber = new Random().nextInt(10000) + 1; + int batchSize = new Random().nextInt(1000) + 1; + int rowGroupSize = new Random().nextInt(1000) + 1; + List records = new ArrayList<>(recordNumber); + for (int i = 0; i < recordNumber; i++) { + Integer v = i; + records.add(newRow(v)); + } + + Path testPath = createTempParquetFile(folder, records, rowGroupSize); + + DataType[] fieldTypes = new DataType[] {new DoubleType()}; + ParquetReaderFactory format = + new ParquetReaderFactory( + new Options(), + RowType.builder().fields(fieldTypes, new String[] {"f7"}).build(), + batchSize); + + AtomicInteger cnt = new AtomicInteger(0); + try (RecordReader reader = format.createReader(new LocalFileIO(), testPath)) { + reader.forEachRemainingWithPosition( + pair -> { + long rowPosition = pair.getLeft(); + InternalRow row = pair.getRight(); + assertThat(row.getDouble(0)).isEqualTo(cnt.get()); + // check row position + assertThat(rowPosition).isEqualTo(cnt.get()); + cnt.incrementAndGet(); + }); + } + } + private void innerTestTypes(File folder, List records, int rowGroupSize) throws IOException { List rows = records.stream().map(this::newRow).collect(Collectors.toList());