diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 375b2fbc81f1f..6628c39c5fda7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -476,9 +476,10 @@ private static void validateDefaultValues(TableSchema schema) { } private static void validateForDeleteMap(CoreOptions options) { - if (!options.formatType().equals(FileFormatType.PARQUET)) { + if (!options.formatType().equals(FileFormatType.ORC) + && !options.formatType().equals(FileFormatType.PARQUET)) { throw new IllegalArgumentException( - "Delete map is only supported for parquet file format now."); + "Delete map is only supported for orc or parquet file format now."); } if (options.changelogProducer() != ChangelogProducer.NONE diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 0e2a0308edd06..68c9c13e751ae 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -26,6 +26,7 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; import org.apache.paimon.format.orc.filter.OrcFilters; +import org.apache.paimon.format.orc.reader.OrcRowPositionColumnVector; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader.RecordIterator; @@ -123,7 +124,13 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type); + if ("_POSITION".equals(name)) { + vectors[i] = + new OrcRowPositionColumnVector( + orcBatch.cols[tableFieldNames.indexOf(name)]); + } else { + vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type); + } } return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler); } @@ -176,11 +183,18 @@ public VectorizedRowBatch orcVectorizedRowBatch() { return orcVectorizedRowBatch; } - private RecordIterator convertAndGetIterator(VectorizedRowBatch orcBatch) { + private RecordIterator convertAndGetIterator( + VectorizedRowBatch orcBatch, long rowNumber) { // no copying from the ORC column vectors to the Paimon columns vectors necessary, // because they point to the same data arrays internally design int batchSize = orcBatch.size; paimonColumnBatch.setNumRows(batchSize); + for (ColumnVector column : paimonColumnBatch.columns) { + if (column instanceof OrcRowPositionColumnVector) { + ((OrcRowPositionColumnVector) column).setStartPosition(rowNumber); + break; + } + } result.set(batchSize); return result; } @@ -217,13 +231,13 @@ private OrcVectorizedReader(final RecordReader orcReader, final Pool readBatch() throws IOException { final OrcReaderBatch batch = getCachedEntry(); final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch(); - + long rowNumber = orcReader.getRowNumber(); if (!nextBatch(orcReader, orcVectorBatch)) { batch.recycle(); return null; } - return batch.convertAndGetIterator(orcVectorBatch); + return batch.convertAndGetIterator(orcVectorBatch, rowNumber); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowPositionColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowPositionColumnVector.java new file mode 100644 index 0000000000000..1b68b567f9e18 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowPositionColumnVector.java @@ -0,0 +1,22 @@ +package org.apache.paimon.format.orc.reader; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +/* 1. */ +public class OrcRowPositionColumnVector extends AbstractOrcColumnVector + implements org.apache.paimon.data.columnar.LongColumnVector { + private long startPosition; + + public OrcRowPositionColumnVector(ColumnVector ignore) { + super(ignore); + } + + @Override + public long getLong(int i) { + return startPosition + i; + } + + public void setStartPosition(long startPosition) { + this.startPosition = startPosition; + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index 3ffbaca0752a5..c22de2ff0122f 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -72,6 +72,35 @@ class OrcReaderFactoryTest { }) .build(); + private static final RowType FLAT_FILE_TYPE_WITH_ROW_POSITION = + RowType.builder() + .fields( + new DataType[] { + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] { + "_col0", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_POSITION" + }) + .build(); + private static final RowType DECIMAL_FILE_TYPE = RowType.builder() .fields(new DataType[] {new DecimalType(10, 5)}, new String[] {"_col0"}) @@ -136,6 +165,34 @@ void testReadFileWithSelectFields() throws IOException { assertThat(totalF0.get()).isEqualTo(1844737280400L); } + @Test + void testReadFileWithRowPosition() throws IOException { + OrcReaderFactory format = + createFormat(FLAT_FILE_TYPE_WITH_ROW_POSITION, new int[] {2, 0, 1, 9}); + + AtomicInteger cnt = new AtomicInteger(0); + AtomicLong totalF0 = new AtomicLong(0); + + forEach( + format, + flatFile, + row -> { + assertThat(row.isNullAt(0)).isFalse(); + assertThat(row.isNullAt(1)).isFalse(); + assertThat(row.isNullAt(2)).isFalse(); + assertThat(row.getString(0).toString()).isNotNull(); + totalF0.addAndGet(row.getInt(1)); + assertThat(row.getString(2).toString()).isNotNull(); + // check row position + assertThat(row.getLong(3)).isEqualTo(cnt.get()); + cnt.incrementAndGet(); + }); + + // check that all rows have been read + assertThat(cnt.get()).isEqualTo(1920800); + assertThat(totalF0.get()).isEqualTo(1844737280400L); + } + @Test void testReadDecimalTypeFile() throws IOException { OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});