Skip to content

Commit

Permalink
[core] Introduce file-reader-async-threshold to speed up merging (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Oct 13, 2023
1 parent fa68a1e commit d2ddaa9
Show file tree
Hide file tree
Showing 31 changed files with 522 additions and 43 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
</tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
<td>MemorySize</td>
<td>The threshold for read file async.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.");

public static final ConfigOption<MemorySize> FILE_READER_ASYNC_THRESHOLD =
key("file-reader-async-threshold")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(10))
.withDescription("The threshold for read file async.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -992,6 +998,10 @@ public String fileCompression() {
return options.get(FILE_COMPRESSION);
}

public MemorySize fileReaderAsyncThreshold() {
return options.get(FILE_READER_ASYNC_THRESHOLD);
}

public int snapshotNumRetainMin() {
return options.get(SNAPSHOT_NUM_RETAINED_MIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@
public interface FormatReaderFactory extends Serializable {

RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException;

RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
throws IOException;
}
3 changes: 3 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.ThreadSafe;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -53,6 +55,7 @@
* @since 0.4.0
*/
@Public
@ThreadSafe
public interface FileIO extends Serializable {

Logger LOG = LoggerFactory.getLogger(FileIO.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public KeyValueFileStoreRead newRead() {
mfFactory,
FileFormatDiscover.of(options),
pathFactory(),
keyValueFieldsExtractor);
keyValueFieldsExtractor,
options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.paimon.casting;

import javax.annotation.concurrent.ThreadSafe;

/**
* Interface to model a function that performs the casting of a value from one type to another.
*
* @param <IN> Input internal type
* @param <OUT> Output internal type
*/
@ThreadSafe
public interface CastExecutor<IN, OUT> {

/** Cast the input value. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@

import org.apache.paimon.CoreOptions;

import java.util.HashMap;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A class to discover {@link FileFormat}. */
@ThreadSafe
public interface FileFormatDiscover {

static FileFormatDiscover of(CoreOptions options) {
Map<String, FileFormat> formats = new HashMap<>();
Map<String, FileFormat> formats = new ConcurrentHashMap<>();
return new FileFormatDiscover() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ public KeyValueDataFileRecordReader(
RowType keyType,
RowType valueType,
int level,
@Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping)
throws IOException {
this.reader = FileUtils.createFormatReader(fileIO, readerFactory, path);
FileUtils.checkExists(fileIO, path);
this.reader =
poolSize == null
? readerFactory.createReader(fileIO, path)
: readerFactory.createReader(fileIO, path, poolSize);
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.io;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.format.FileFormatDiscover;
Expand All @@ -27,8 +28,8 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Projection;
Expand All @@ -40,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
public class KeyValueFileReaderFactory {
Expand All @@ -51,8 +53,10 @@ public class KeyValueFileReaderFactory {
private final RowType valueType;

private final BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final DataFilePathFactory pathFactory;
private final long asyncThreshold;

private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -61,36 +65,58 @@ private KeyValueFileReaderFactory(
RowType keyType,
RowType valueType,
BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder,
DataFilePathFactory pathFactory) {
DataFilePathFactory pathFactory,
long asyncThreshold) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
this.pathFactory = pathFactory;
this.asyncThreshold = asyncThreshold;
this.bulkFormatMappings = new HashMap<>();
}

public RecordReader<KeyValue> createRecordReader(long schemaId, String fileName, int level)
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2));
}
return createRecordReader(schemaId, fileName, level, true, null);
}

private RecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer poolSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Supplier<BulkFormatMapping> formatSupplier =
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
schemaManager.schema(this.schemaId),
schemaManager.schema(schemaId));

BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
new FormatKey(schemaId, formatIdentifier),
key -> {
TableSchema tableSchema = schemaManager.schema(this.schemaId);
TableSchema dataSchema = schemaManager.schema(key.schemaId);
return bulkFormatMappingBuilder.build(
formatIdentifier, tableSchema, dataSchema);
});
reuseFormat
? bulkFormatMappings.computeIfAbsent(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
return new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping());
}
Expand All @@ -103,7 +129,8 @@ public static Builder builder(
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
KeyValueFieldsExtractor extractor,
CoreOptions options) {
return new Builder(
fileIO,
schemaManager,
Expand All @@ -112,7 +139,8 @@ public static Builder builder(
valueType,
formatDiscover,
pathFactory,
extractor);
extractor,
options);
}

/** Builder for {@link KeyValueFileReaderFactory}. */
Expand All @@ -126,8 +154,9 @@ public static class Builder {
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final KeyValueFieldsExtractor extractor;

private final int[][] fullKeyProjection;
private final CoreOptions options;

private int[][] keyProjection;
private int[][] valueProjection;
private RowType projectedKeyType;
Expand All @@ -141,7 +170,8 @@ private Builder(
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
KeyValueFieldsExtractor extractor,
CoreOptions options) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -152,6 +182,7 @@ private Builder(
this.extractor = extractor;

this.fullKeyProjection = Projection.range(0, keyType.getFieldCount()).toNestedIndexes();
this.options = options;
this.keyProjection = fullKeyProjection;
this.valueProjection = Projection.range(0, valueType.getFieldCount()).toNestedIndexes();
applyProjection();
Expand All @@ -166,7 +197,8 @@ public Builder copyWithoutProjection() {
valueType,
formatDiscover,
pathFactory,
extractor);
extractor,
options);
}

public Builder withKeyProjection(int[][] projection) {
Expand Down Expand Up @@ -205,7 +237,8 @@ public KeyValueFileReaderFactory build(
projectedValueType,
BulkFormatMapping.newBuilder(
formatDiscover, extractor, keyProjection, valueProjection, filters),
pathFactory.createDataFilePathFactory(partition, bucket));
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes());
}

private void applyProjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public static RecordReader<KeyValue> readerForRun(
readers.add(
() ->
readerFactory.createRecordReader(
file.schemaId(), file.fileName(), file.level()));
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level()));
}
return ConcatRecordReader.create(readers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public KeyValueFileStoreRead(
MergeFunctionFactory<KeyValue> mfFactory,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
KeyValueFieldsExtractor extractor,
CoreOptions options) {
this.tableSchema = schemaManager.schema(schemaId);
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
Expand All @@ -106,7 +107,8 @@ public KeyValueFileStoreRead(
valueType,
formatDiscover,
pathFactory,
extractor);
extractor,
options);
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
Expand Down Expand Up @@ -261,7 +263,7 @@ private RecordReader<KeyValue> streamingConcat(
// See comments on DataFileMeta#extraFiles.
String fileName = changelogFile(file).orElse(file.fileName());
return readerFactory.createRecordReader(
file.schemaId(), fileName, file.level());
file.schemaId(), fileName, file.fileSize(), file.level());
});
}
return ConcatRecordReader.create(suppliers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public KeyValueFileStoreWrite(
valueType,
FileFormatDiscover.of(options),
pathFactory,
extractor);
extractor,
options);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
Expand Down Expand Up @@ -266,7 +267,7 @@ private LookupLevels createLookupLevels(
valueType,
file ->
readerFactory.createRecordReader(
file.schemaId(), file.fileName(), file.level()),
file.schemaId(), file.fileName(), file.fileSize(), file.level()),
() -> ioManager.createChannel().getPathFile(),
new HashLookupStoreFactory(
cacheManager,
Expand All @@ -287,7 +288,7 @@ private ContainsLevels createContainsLevels(
keyType,
file ->
readerFactory.createRecordReader(
file.schemaId(), file.fileName(), file.level()),
file.schemaId(), file.fileName(), file.fileSize(), file.level()),
() -> ioManager.createChannel().getPathFile(),
new HashLookupStoreFactory(
cacheManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import org.apache.paimon.types.DataField;

import javax.annotation.concurrent.ThreadSafe;

import java.io.Serializable;
import java.util.List;

/** Extractor of schema for different tables. */
@ThreadSafe
public interface KeyValueFieldsExtractor extends Serializable {

/**
* Extract key fields from table schema.
*
Expand Down
Loading

0 comments on commit d2ddaa9

Please sign in to comment.