Skip to content

Commit

Permalink
update: orc support read row position
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 26, 2024
1 parent 9ca2bb3 commit 31899dd
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ private static void validateDefaultValues(TableSchema schema) {
}

private static void validateForDeleteMap(CoreOptions options) {
if (!options.formatType().equals(FileFormatType.PARQUET)) {
if (!options.formatType().equals(FileFormatType.ORC)
&& !options.formatType().equals(FileFormatType.PARQUET)) {
throw new IllegalArgumentException(
"Delete map is only supported for parquet file format now.");
"Delete map is only supported for orc or parquet file format now.");
}

if (options.changelogProducer() != ChangelogProducer.NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.format.orc.reader.OrcRowPositionColumnVector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader.RecordIterator;
Expand Down Expand Up @@ -123,7 +124,13 @@ public OrcReaderBatch createReaderBatch(
for (int i = 0; i < vectors.length; i++) {
String name = tableFieldNames.get(i);
DataType type = tableFieldTypes.get(i);
vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
if ("_POSITION".equals(name)) {
vectors[i] =
new OrcRowPositionColumnVector(
orcBatch.cols[tableFieldNames.indexOf(name)]);
} else {
vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
}
}
return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler);
}
Expand Down Expand Up @@ -176,11 +183,18 @@ public VectorizedRowBatch orcVectorizedRowBatch() {
return orcVectorizedRowBatch;
}

private RecordIterator<InternalRow> convertAndGetIterator(VectorizedRowBatch orcBatch) {
private RecordIterator<InternalRow> convertAndGetIterator(
VectorizedRowBatch orcBatch, long rowNumber) {
// no copying from the ORC column vectors to the Paimon columns vectors necessary,
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
for (ColumnVector column : paimonColumnBatch.columns) {
if (column instanceof OrcRowPositionColumnVector) {
((OrcRowPositionColumnVector) column).setStartPosition(rowNumber);
break;
}
}
result.set(batchSize);
return result;
}
Expand Down Expand Up @@ -217,13 +231,13 @@ private OrcVectorizedReader(final RecordReader orcReader, final Pool<OrcReaderBa
public RecordIterator<InternalRow> readBatch() throws IOException {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();

long rowNumber = orcReader.getRowNumber();
if (!nextBatch(orcReader, orcVectorBatch)) {
batch.recycle();
return null;
}

return batch.convertAndGetIterator(orcVectorBatch);
return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;

/* 1. */
public class OrcRowPositionColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.LongColumnVector {
private long startPosition;

public OrcRowPositionColumnVector(ColumnVector ignore) {
super(ignore);
}

@Override
public long getLong(int i) {
return startPosition + i;
}

public void setStartPosition(long startPosition) {
this.startPosition = startPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ class OrcReaderFactoryTest {
})
.build();

private static final RowType FLAT_FILE_TYPE_WITH_ROW_POSITION =
RowType.builder()
.fields(
new DataType[] {
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {
"_col0",
"_col1",
"_col2",
"_col3",
"_col4",
"_col5",
"_col6",
"_col7",
"_col8",
"_POSITION"
})
.build();

private static final RowType DECIMAL_FILE_TYPE =
RowType.builder()
.fields(new DataType[] {new DecimalType(10, 5)}, new String[] {"_col0"})
Expand Down Expand Up @@ -136,6 +165,34 @@ void testReadFileWithSelectFields() throws IOException {
assertThat(totalF0.get()).isEqualTo(1844737280400L);
}

@Test
void testReadFileWithRowPosition() throws IOException {
OrcReaderFactory format =
createFormat(FLAT_FILE_TYPE_WITH_ROW_POSITION, new int[] {2, 0, 1, 9});

AtomicInteger cnt = new AtomicInteger(0);
AtomicLong totalF0 = new AtomicLong(0);

forEach(
format,
flatFile,
row -> {
assertThat(row.isNullAt(0)).isFalse();
assertThat(row.isNullAt(1)).isFalse();
assertThat(row.isNullAt(2)).isFalse();
assertThat(row.getString(0).toString()).isNotNull();
totalF0.addAndGet(row.getInt(1));
assertThat(row.getString(2).toString()).isNotNull();
// check row position
assertThat(row.getLong(3)).isEqualTo(cnt.get());
cnt.incrementAndGet();
});

// check that all rows have been read
assertThat(cnt.get()).isEqualTo(1920800);
assertThat(totalF0.get()).isEqualTo(1844737280400L);
}

@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
Expand Down

0 comments on commit 31899dd

Please sign in to comment.