Skip to content

Commit

Permalink
[core] Reduce getFileSize for avro reader (apache#3040)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 19, 2024
1 parent 36ae4ae commit cb8cabc
Show file tree
Hide file tree
Showing 28 changed files with 261 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,30 @@
import org.apache.paimon.reader.RecordReader;

/** the context for creating RecordReader {@link RecordReader}. */
public class FormatReaderContext {
public class FormatReaderContext implements FormatReaderFactory.Context {

private final FileIO fileIO;
private final Path file;
private final Integer poolSize;
private final Long fileSize;
private final long fileSize;

public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, Long fileSize) {
public FormatReaderContext(FileIO fileIO, Path file, long fileSize) {
this.fileIO = fileIO;
this.file = file;
this.poolSize = poolSize;
this.fileSize = fileSize;
}

public FileIO getFileIO() {
@Override
public FileIO fileIO() {
return fileIO;
}

public Path getFile() {
@Override
public Path filePath() {
return file;
}

public Integer getPoolSize() {
return poolSize;
}

public Long getFileSize() {
@Override
public long fileSize() {
return fileSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@
/** A factory to create {@link RecordReader} for file. */
public interface FormatReaderFactory extends Serializable {

default RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException {
return createReader(new FormatReaderContext(fileIO, file, null, null));
}
RecordReader<InternalRow> createReader(Context context) throws IOException;

/** Context for creating reader. */
interface Context {

FileIO fileIO();

RecordReader<InternalRow> createReader(FormatReaderContext context) throws IOException;
Path filePath();

long fileSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

/** The context for creating orc {@link RecordReader}. */
public class OrcFormatReaderContext extends FormatReaderContext {

private final int poolSize;

public OrcFormatReaderContext(FileIO fileIO, Path filePath, long fileSize, int poolSize) {
super(fileIO, filePath, fileSize);
this.poolSize = poolSize;
}

public int poolSize() {
return poolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void testSimpleTypes() throws IOException {
out.close();

RecordReader<InternalRow> reader =
format.createReaderFactory(rowType).createReader(fileIO, file);
format.createReaderFactory(rowType)
.createReader(
new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)));
List<InternalRow> result = new ArrayList<>();
reader.forEachRemaining(row -> result.add(serializer.copy(row)));

Expand All @@ -123,7 +125,9 @@ public void testFullTypes() throws IOException {
out.close();

RecordReader<InternalRow> reader =
format.createReaderFactory(rowType).createReader(fileIO, file);
format.createReaderFactory(rowType)
.createReader(
new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)));
List<InternalRow> result = new ArrayList<>();
reader.forEachRemaining(result::add);
assertThat(result.size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
Expand All @@ -50,22 +47,17 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
@Nullable private final CastFieldGetter[] castMapping;

public KeyValueDataFileRecordReader(
FileIO fileIO,
FormatReaderFactory readerFactory,
Path path,
FormatReaderFactory.Context context,
RowType keyType,
RowType valueType,
int level,
@Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
long fileSize)
@Nullable PartitionInfo partitionInfo)
throws IOException {
FileUtils.checkExists(fileIO, path);
this.reader =
readerFactory.createReader(
new FormatReaderContext(fileIO, path, poolSize, fileSize));
FileUtils.checkExists(context.fileIO(), context.filePath());
this.reader = readerFactory.createReader(context);
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
Expand Down Expand Up @@ -103,7 +106,7 @@ private RecordReader<KeyValue> createRecordReader(
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer poolSize,
@Nullable Integer orcPoolSize,
long fileSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);
Expand All @@ -121,19 +124,20 @@ private RecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName);
RecordReader<KeyValue> recordReader =
new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
orcPoolSize == null
? new FormatReaderContext(fileIO, filePath, fileSize)
: new OrcFormatReaderContext(
fileIO, filePath, fileSize, orcPoolSize),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
fileSize);
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public RowDataFileRecordReader(
@Nullable PartitionInfo partitionInfo)
throws IOException {
FileUtils.checkExists(fileIO, path);
FormatReaderContext context = new FormatReaderContext(fileIO, path, null, fileSize);
FormatReaderContext context = new FormatReaderContext(fileIO, path, fileSize);
this.reader = readerFactory.createReader(context);
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ static List<Supplier<List<ManifestEntry>>> readManifestEntries(
for (ManifestFileMeta file : manifestFiles) {
Future<List<ManifestEntry>> future =
CompletableFuture.supplyAsync(
() -> manifestFile.read(file.fileName()),
() -> manifestFile.read(file.fileName(), file.fileSize()),
FileUtils.COMMON_IO_FORK_JOIN_POOL);
result.add(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
for (; j < base.size(); j++) {
ManifestFileMeta file = base.get(j);
boolean contains = false;
for (ManifestEntry entry : manifestFile.read(file.fileName)) {
for (ManifestEntry entry : manifestFile.read(file.fileName, file.fileSize)) {
checkArgument(entry.kind() == FileKind.ADD);
if (deleteEntries.contains(entry.identifier())) {
contains = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
*/
public class ManifestList extends ObjectsFile<ManifestFileMeta> {

private final FormatWriterFactory writerFactory;

private ManifestList(
FileIO fileIO,
ManifestFileMetaSerializer serializer,
Expand All @@ -49,7 +47,6 @@ private ManifestList(
PathFactory pathFactory,
@Nullable SegmentsCache<String> cache) {
super(fileIO, serializer, readerFactory, writerFactory, pathFactory, cache);
this.writerFactory = writerFactory;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
.create()
.read(
manifest.fileName(),
manifest.fileSize(),
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, numOfBuckets));
Expand All @@ -469,6 +470,7 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
.createSimpleFileEntryReader()
.read(
manifest.fileName(),
manifest.fileSize(),
// use filter for ManifestEntry
// currently, projection is not pushed down to file format
// see SimpleFileEntrySerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void cleanUnusedDataFiles(Snapshot snapshot, Predicate<ManifestEntry> ski
// try read manifests
List<String> manifestFileNames =
readManifestFileNames(tryReadManifestList(snapshot.deltaManifestList()));
List<ManifestEntry> manifestEntries = new ArrayList<>();
List<ManifestEntry> manifestEntries;
// data file path -> (original manifest entry, extra file paths)
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new HashMap<>();
for (String manifest : manifestFileNames) {
Expand Down
25 changes: 9 additions & 16 deletions paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
package org.apache.paimon.utils;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.stream.Stream;
Expand Down Expand Up @@ -70,18 +71,6 @@ public static synchronized ForkJoinPool getScanIoForkJoinPool(int parallelism) {
return scanIoForkJoinPool;
}

public static <T> List<T> readListFromFile(
FileIO fileIO,
Path path,
ObjectSerializer<T> serializer,
FormatReaderFactory readerFactory)
throws IOException {
List<T> result = new ArrayList<>();
createFormatReader(fileIO, readerFactory, path)
.forEachRemaining(row -> result.add(serializer.fromRow(row)));
return result;
}

/**
* List versioned files for the directory.
*
Expand Down Expand Up @@ -143,8 +132,12 @@ public static void checkExists(FileIO fileIO, Path file) throws IOException {
}

public static RecordReader<InternalRow> createFormatReader(
FileIO fileIO, FormatReaderFactory format, Path file) throws IOException {
FileIO fileIO, FormatReaderFactory format, Path file, @Nullable Long fileSize)
throws IOException {
checkExists(fileIO, file);
return format.createReader(fileIO, file);
if (fileSize == null) {
fileSize = fileIO.getFileSize(file);
}
return format.createReader(new FormatReaderContext(fileIO, file, fileSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,39 @@
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;

import javax.annotation.Nullable;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.BiFunction;

/** Cache records to {@link SegmentsCache} by compacted serializer. */
public class ObjectsCache<K, V> {

private final SegmentsCache<K> cache;
private final ObjectSerializer<V> serializer;
private final InternalRowSerializer rowSerializer;
private final Function<K, CloseableIterator<InternalRow>> reader;
private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;

public ObjectsCache(
SegmentsCache<K> cache,
ObjectSerializer<V> serializer,
Function<K, CloseableIterator<InternalRow>> reader) {
BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
this.cache = cache;
this.serializer = serializer;
this.rowSerializer = new InternalRowSerializer(serializer.fieldTypes());
this.reader = reader;
}

public List<V> read(K key, Filter<InternalRow> loadFilter, Filter<InternalRow> readFilter)
public List<V> read(
K key,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
Segments segments = cache.getSegments(key, k -> readSegments(k, loadFilter));
Segments segments = cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter));
List<V> entries = new ArrayList<>();
RandomAccessInputView view =
new RandomAccessInputView(
Expand All @@ -71,8 +77,8 @@ public List<V> read(K key, Filter<InternalRow> loadFilter, Filter<InternalRow> r
}
}

private Segments readSegments(K key, Filter<InternalRow> loadFilter) {
try (CloseableIterator<InternalRow> iterator = reader.apply(key)) {
private Segments readSegments(K key, @Nullable Long fileSize, Filter<InternalRow> loadFilter) {
try (CloseableIterator<InternalRow> iterator = reader.apply(key, fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
MemorySegmentSource segmentSource =
() -> MemorySegment.allocateHeapMemory(cache.pageSize());
Expand Down
Loading

0 comments on commit cb8cabc

Please sign in to comment.