From 7175bd8e8ac97d0561fdb2cf92352febdc3fad75 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 19 Mar 2024 14:29:03 +0800 Subject: [PATCH] [core] orc/parquet reader obtain the fileSize from metadata (#2918) --- .../paimon/format/FormatReaderContext.java | 54 +++++++++++++++++++ .../paimon/format/FormatReaderFactory.java | 7 +-- .../io/KeyValueDataFileRecordReader.java | 9 ++-- .../paimon/io/KeyValueFileReaderFactory.java | 10 ++-- .../paimon/io/RowDataFileRecordReader.java | 6 ++- .../operation/AppendOnlyFileStoreRead.java | 1 + .../paimon/io/KeyValueFileReadWriteTest.java | 27 ++++++++++ .../paimon/format/avro/AvroBulkFormat.java | 10 ++-- .../paimon/format/orc/OrcReaderFactory.java | 16 +++--- .../format/parquet/ParquetReaderFactory.java | 14 +++-- .../format/orc/OrcReaderFactoryTest.java | 9 +++- 11 files changed, 126 insertions(+), 37 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java new file mode 100644 index 000000000000..b1ad3fa47e1a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.RecordReader; + +/** the context for creating RecordReader {@link RecordReader}. */ +public class FormatReaderContext { + private final FileIO fileIO; + private final Path file; + private final Integer poolSize; + private final Long fileSize; + + public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, Long fileSize) { + this.fileIO = fileIO; + this.file = file; + this.poolSize = poolSize; + this.fileSize = fileSize; + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path getFile() { + return file; + } + + public Integer getPoolSize() { + return poolSize; + } + + public Long getFileSize() { + return fileSize; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index b2b179159b8e..f524ff4a1465 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -29,8 +29,9 @@ /** A factory to create {@link RecordReader} for file. */ public interface FormatReaderFactory extends Serializable { - RecordReader createReader(FileIO fileIO, Path file) throws IOException; + default RecordReader createReader(FileIO fileIO, Path file) throws IOException { + return createReader(new FormatReaderContext(fileIO, file, null, null)); + } - RecordReader createReader(FileIO fileIO, Path file, int poolSize) - throws IOException; + RecordReader createReader(FormatReaderContext context) throws IOException; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index fe38ae146463..4e7dfec9e55f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.columnar.ColumnarRowIterator; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -58,13 +59,13 @@ public KeyValueDataFileRecordReader( @Nullable Integer poolSize, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) + @Nullable PartitionInfo partitionInfo, + long fileSize) throws IOException { FileUtils.checkExists(fileIO, path); this.reader = - poolSize == null - ? readerFactory.createReader(fileIO, path) - : readerFactory.createReader(fileIO, path, poolSize); + readerFactory.createReader( + new FormatReaderContext(fileIO, path, poolSize, fileSize)); this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; this.indexMapping = indexMapping; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index cc7534e9aed6..3123518c29bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -93,9 +93,9 @@ public RecordReader createRecordReader( long schemaId, String fileName, long fileSize, int level) throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith("orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2)); + () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); } - return createRecordReader(schemaId, fileName, level, true, null); + return createRecordReader(schemaId, fileName, level, true, null, fileSize); } private RecordReader createRecordReader( @@ -103,7 +103,8 @@ private RecordReader createRecordReader( String fileName, int level, boolean reuseFormat, - @Nullable Integer poolSize) + @Nullable Integer poolSize, + long fileSize) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -131,7 +132,8 @@ private RecordReader createRecordReader( poolSize, bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + fileSize); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get()); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java index ee8f9c26f8b0..b461ebf0b6b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.columnar.ColumnarRowIterator; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -46,12 +47,15 @@ public class RowDataFileRecordReader implements RecordReader { public RowDataFileRecordReader( FileIO fileIO, Path path, + long fileSize, FormatReaderFactory readerFactory, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, @Nullable PartitionInfo partitionInfo) throws IOException { - this.reader = FileUtils.createFormatReader(fileIO, readerFactory, path); + FileUtils.checkExists(fileIO, path); + FormatReaderContext context = new FormatReaderContext(fileIO, path, null, fileSize); + this.reader = readerFactory.createReader(context); this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index 8363c297ad7b..b67edaaf3e6b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -177,6 +177,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio new RowDataFileRecordReader( fileIO, dataFilePathFactory.toPath(file.fileName()), + file.fileSize(), bulkFormatMapping.getReaderFactory(), bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index ca7f75d6eb57..552eac0a13f2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -48,6 +48,8 @@ import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.HashMap; @@ -385,4 +387,29 @@ private void checkRollingFiles( assertThat(meta.level()).isEqualTo(expected.level()); } } + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc", "avro"}) + public void testReaderUseFileSizeFromMetadata(String format) throws Exception { + DataFileTestDataGenerator.Data data = gen.next(); + KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); + DataFileMetaSerializer serializer = new DataFileMetaSerializer(); + + RollingFileWriter writer = + writerFactory.createRollingMergeTreeFileWriter(0); + writer.write(CloseableIterator.fromList(data.content, kv -> {})); + writer.close(); + List actualMetas = writer.result(); + + KeyValueFileReaderFactory readerFactory = + createReaderFactory(tempDir.toString(), format, null, null); + assertData( + data, + actualMetas, + TestKeyValueGenerator.KEY_SERIALIZER, + TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER, + serializer, + readerFactory, + kv -> kv); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index da60eedda113..717f998959a8 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.avro; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -49,14 +50,9 @@ public AvroBulkFormat(RowType projectedRowType) { } @Override - public RecordReader createReader(FileIO fileIO, Path file) throws IOException { - return new AvroReader(fileIO, file); - } - - @Override - public RecordReader createReader(FileIO fileIO, Path file, int poolSize) + public RecordReader createReader(FormatReaderContext formatReaderContext) throws IOException { - throw new UnsupportedOperationException(); + return new AvroReader(formatReaderContext.getFileIO(), formatReaderContext.getFile()); } private class AvroReader implements RecordReader { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 696665777243..cdc46139f790 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; import org.apache.paimon.format.orc.filter.OrcFilters; @@ -88,14 +89,13 @@ public OrcReaderFactory( // ------------------------------------------------------------------------ @Override - public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOException { - return createReader(fileIO, file, 1); - } - - @Override - public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize) - throws IOException { + public OrcVectorizedReader createReader(FormatReaderContext context) throws IOException { + int poolSize = context.getPoolSize() == null ? 1 : context.getPoolSize(); Pool poolOfBatches = createPoolOfBatches(poolSize); + + FileIO fileIO = context.getFileIO(); + Long fileSize = context.getFileSize(); + Path file = context.getFile(); RecordReader orcReader = createRecordReader( hadoopConfigWrapper.getHadoopConfig(), @@ -104,7 +104,7 @@ public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize) fileIO, file, 0, - fileIO.getFileSize(file)); + fileSize == null ? fileIO.getFileSize(file) : fileSize); return new OrcVectorizedReader(orcReader, poolOfBatches); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 2c2985d32ee1..29cf45a65260 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; import org.apache.paimon.format.parquet.reader.ParquetDecimalVector; @@ -87,9 +88,12 @@ public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize) } @Override - public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOException { + public ParquetReader createReader(FormatReaderContext context) throws IOException { + Path filePath = context.getFile(); + FileIO fileIO = context.getFileIO(); + Long fileSize = context.getFileSize(); final long splitOffset = 0; - final long splitLength = fileIO.getFileSize(filePath); + final long splitLength = fileSize == null ? fileIO.getFileSize(filePath) : fileSize; ParquetReadOptions.Builder builder = ParquetReadOptions.builder().withRange(splitOffset, splitOffset + splitLength); @@ -108,12 +112,6 @@ public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOExcepti return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches); } - @Override - public RecordReader createReader(FileIO fileIO, Path file, int poolSize) - throws IOException { - throw new UnsupportedOperationException(); - } - private void setReadOptions(ParquetReadOptions.Builder builder) { builder.useSignedStringMinMax( conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index a5160f5c7990..5a0f4925ddbc 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -179,8 +180,10 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException { AtomicBoolean isFirst = new AtomicBoolean(true); + LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + format.createReader( + new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) { reader.forEachRemainingWithPosition( (rowPosition, row) -> { // check filter: _col0 > randomStart @@ -202,8 +205,10 @@ void testReadRowPositionWithTransformAndFilter() throws IOException { int randomPooSize = new Random().nextInt(3) + 1; OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}); + LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + format.createReader( + new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) { reader.transform(row -> row) .filter(row -> row.getInt(1) % 123 == 0) .forEachRemainingWithPosition(