Skip to content

Commit

Permalink
[core] Introduce RawFileSplitRead to accelerate batch read for primar…
Browse files Browse the repository at this point in the history
…y key table (#3209)
  • Loading branch information
JingsongLi authored Apr 15, 2024
1 parent 9bb755d commit 8ff2537
Show file tree
Hide file tree
Showing 20 changed files with 455 additions and 330 deletions.
48 changes: 48 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.util.function.Supplier;

/** A class to lazy initialized field. */
public class LazyField<T> {

private final Supplier<T> supplier;

private boolean initialized;
private T value;

public LazyField(Supplier<T> supplier) {
this.supplier = supplier;
}

public T get() {
if (!initialized) {
T t = supplier.get();
value = t;
initialized = true;
return t;
}
return value;
}

public boolean initialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -79,8 +79,8 @@ public AppendOnlyFileStoreScan newScan(String branchName) {
}

@Override
public AppendOnlyFileStoreRead newRead() {
return new AppendOnlyFileStoreRead(
public RawFileSplitRead newRead() {
return new RawFileSplitRead(
fileIO,
schemaManager,
schema,
Expand Down
4 changes: 2 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
Expand Down Expand Up @@ -73,7 +73,7 @@ public interface FileStore<T> extends Serializable {

StatsFileHandler newStatsFileHandler();

FileStoreRead<T> newRead();
SplitRead<T> newRead();

FileStoreWrite<T> newWrite(String commitUser);

Expand Down
17 changes: 14 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
Expand Down Expand Up @@ -119,8 +120,8 @@ public KeyValueFileStoreScan newScan(String branchName) {
}

@Override
public KeyValueFileStoreRead newRead() {
return new KeyValueFileStoreRead(
public MergeFileSplitRead newRead() {
return new MergeFileSplitRead(
options,
schema,
keyType,
Expand All @@ -130,6 +131,16 @@ public KeyValueFileStoreRead newRead() {
newReaderFactoryBuilder());
}

public RawFileSplitRead newBatchRawFileRead() {
return new RawFileSplitRead(
fileIO,
schemaManager,
schema,
valueType,
FileFormatDiscover.of(options),
pathFactory());
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
return KeyValueFileReaderFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -40,18 +39,6 @@ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletion
this.deletionVector = deletionVector;
}

public static <T> RecordReader<T> create(RecordReader<T> reader, Optional<DeletionVector> dv) {
return create(reader, dv.orElse(null));
}

public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable DeletionVector dv) {
if (dv == null) {
return reader;
}

return new ApplyDeletionVectorReader<>(reader, dv);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow> {

private final FileIO fileIO;
private final AppendOnlyFileStoreRead read;
private final RawFileSplitRead read;
private final long schemaId;
private final RowType rowType;
private final FileFormat fileFormat;
Expand All @@ -81,7 +81,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>

public AppendOnlyFileStoreWrite(
FileIO fileIO,
AppendOnlyFileStoreRead read,
RawFileSplitRead read,
long schemaId,
String commitUser,
RowType rowType,
Expand Down
Loading

0 comments on commit 8ff2537

Please sign in to comment.