Skip to content

Commit

Permalink
[core] orc/parquet reader obtain the fileSize from metadata (#2918)
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 authored Mar 19, 2024
1 parent 2df0c1e commit 7175bd8
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

/** the context for creating RecordReader {@link RecordReader}. */
public class FormatReaderContext {
private final FileIO fileIO;
private final Path file;
private final Integer poolSize;
private final Long fileSize;

public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, Long fileSize) {
this.fileIO = fileIO;
this.file = file;
this.poolSize = poolSize;
this.fileSize = fileSize;
}

public FileIO getFileIO() {
return fileIO;
}

public Path getFile() {
return file;
}

public Integer getPoolSize() {
return poolSize;
}

public Long getFileSize() {
return fileSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
/** A factory to create {@link RecordReader} for file. */
public interface FormatReaderFactory extends Serializable {

RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException;
default RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException {
return createReader(new FormatReaderContext(fileIO, file, null, null));
}

RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
throws IOException;
RecordReader<InternalRow> createReader(FormatReaderContext context) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -58,13 +59,13 @@ public KeyValueDataFileRecordReader(
@Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
@Nullable PartitionInfo partitionInfo,
long fileSize)
throws IOException {
FileUtils.checkExists(fileIO, path);
this.reader =
poolSize == null
? readerFactory.createReader(fileIO, path)
: readerFactory.createReader(fileIO, path, poolSize);
readerFactory.createReader(
new FormatReaderContext(fileIO, path, poolSize, fileSize));
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,18 @@ public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2));
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
}
return createRecordReader(schemaId, fileName, level, true, null);
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
}

private RecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer poolSize)
@Nullable Integer poolSize,
long fileSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Expand Down Expand Up @@ -131,7 +132,8 @@ private RecordReader<KeyValue> createRecordReader(
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
fileSize);
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -46,12 +47,15 @@ public class RowDataFileRecordReader implements RecordReader<InternalRow> {
public RowDataFileRecordReader(
FileIO fileIO,
Path path,
long fileSize,
FormatReaderFactory readerFactory,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
throws IOException {
this.reader = FileUtils.createFormatReader(fileIO, readerFactory, path);
FileUtils.checkExists(fileIO, path);
FormatReaderContext context = new FormatReaderContext(fileIO, path, null, fileSize);
this.reader = readerFactory.createReader(context);
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
new RowDataFileRecordReader(
fileIO,
dataFilePathFactory.toPath(file.fileName()),
file.fileSize(),
bulkFormatMapping.getReaderFactory(),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -385,4 +387,29 @@ private void checkRollingFiles(
assertThat(meta.level()).isEqualTo(expected.level());
}
}

@ParameterizedTest
@ValueSource(strings = {"parquet", "orc", "avro"})
public void testReaderUseFileSizeFromMetadata(String format) throws Exception {
DataFileTestDataGenerator.Data data = gen.next();
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
DataFileMetaSerializer serializer = new DataFileMetaSerializer();

RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(0);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();

KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), format, null, null);
assertData(
data,
actualMetas,
TestKeyValueGenerator.KEY_SERIALIZER,
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
serializer,
readerFactory,
kv -> kv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.avro;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -49,14 +50,9 @@ public AvroBulkFormat(RowType projectedRowType) {
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException {
return new AvroReader(fileIO, file);
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
public RecordReader<InternalRow> createReader(FormatReaderContext formatReaderContext)
throws IOException {
throw new UnsupportedOperationException();
return new AvroReader(formatReaderContext.getFileIO(), formatReaderContext.getFile());
}

private class AvroReader implements RecordReader<InternalRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
Expand Down Expand Up @@ -88,14 +89,13 @@ public OrcReaderFactory(
// ------------------------------------------------------------------------

@Override
public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOException {
return createReader(fileIO, file, 1);
}

@Override
public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize)
throws IOException {
public OrcVectorizedReader createReader(FormatReaderContext context) throws IOException {
int poolSize = context.getPoolSize() == null ? 1 : context.getPoolSize();
Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);

FileIO fileIO = context.getFileIO();
Long fileSize = context.getFileSize();
Path file = context.getFile();
RecordReader orcReader =
createRecordReader(
hadoopConfigWrapper.getHadoopConfig(),
Expand All @@ -104,7 +104,7 @@ public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize)
fileIO,
file,
0,
fileIO.getFileSize(file));
fileSize == null ? fileIO.getFileSize(file) : fileSize);
return new OrcVectorizedReader(orcReader, poolOfBatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
Expand Down Expand Up @@ -87,9 +88,12 @@ public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize)
}

@Override
public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOException {
public ParquetReader createReader(FormatReaderContext context) throws IOException {
Path filePath = context.getFile();
FileIO fileIO = context.getFileIO();
Long fileSize = context.getFileSize();
final long splitOffset = 0;
final long splitLength = fileIO.getFileSize(filePath);
final long splitLength = fileSize == null ? fileIO.getFileSize(filePath) : fileSize;

ParquetReadOptions.Builder builder =
ParquetReadOptions.builder().withRange(splitOffset, splitOffset + splitLength);
Expand All @@ -108,12 +112,6 @@ public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOExcepti
return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches);
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
throws IOException {
throw new UnsupportedOperationException();
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
builder.useSignedStringMinMax(
conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
Expand Down Expand Up @@ -179,8 +180,10 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException {

AtomicBoolean isFirst = new AtomicBoolean(true);

LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
format.createReader(
new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) {
reader.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check filter: _col0 > randomStart
Expand All @@ -202,8 +205,10 @@ void testReadRowPositionWithTransformAndFilter() throws IOException {
int randomPooSize = new Random().nextInt(3) + 1;
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});

LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
format.createReader(
new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) {
reader.transform(row -> row)
.filter(row -> row.getInt(1) % 123 == 0)
.forEachRemainingWithPosition(
Expand Down

0 comments on commit 7175bd8

Please sign in to comment.