Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][format] Optimize manifest reading performance,add pushdown for manifest and orc. #4497

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9873457
[core] fix hll class not found
Aug 21, 2024
dedcaa4
[format][orc] open orc switch useSelected,allowSARGToFilter to make s…
Sep 22, 2024
4794d5f
Merge branch 'master' of github.com:ranxianglei/paimon
Sep 22, 2024
462edc6
[format][orc] miss tolerateMissingSchema
Sep 22, 2024
2226fb9
[format][orc] fix orc selected close for no filter condition
Sep 22, 2024
ee915c8
[orc] keep useSelected and allowSARGToFilter close default, or deleti…
Sep 27, 2024
2920fc9
[format][orc] VectorizedRowBatch to OrcColumnVector for selected rows…
Oct 15, 2024
8a89649
[format][orc] remove all isRepeating
Oct 15, 2024
c745e55
[core][format] merge with aa16c2bf1
Nov 11, 2024
7841f25
[core][format] merge conflicts
Nov 11, 2024
bbdd316
[core] fix AuditLogTable merge error
Nov 11, 2024
10ef09c
[format] recover HadoopFileIO
Nov 12, 2024
a2acbab
[format] checkstyle
Nov 12, 2024
e1c90c7
[format][orc] add pushdown option only for reader .
Nov 12, 2024
4364ac1
Merge branch 'master' into op_manifest
ranxianglei Nov 12, 2024
8c9a75c
[core] recover bucket
Nov 12, 2024
dfaeac3
Merge branch 'op_manifest' of github.com:ranxianglei/paimon into op_m…
Nov 12, 2024
f71d658
[core][format] add test for withBuckets and orcFormat
Nov 12, 2024
efac5b6
[format] fix checkstyle
Nov 12, 2024
15b1910
[format] fix version caused error
Nov 12, 2024
e1b3406
[core] fix checkstyle
Nov 12, 2024
d48fff6
[format] add FormatPerformanceTest
Nov 13, 2024
a0efae2
[format][tests] FormatPerformanceTest change to 10 times
Nov 13, 2024
016620c
[format][tests] FormatPerformanceTest change to lessthan to pass gith…
Nov 13, 2024
133a491
[format] merge conflicts
Nov 14, 2024
282a2c9
[format] id to lowercase
Nov 14, 2024
884d12f
[tests] core org.apache.paimon.factories.Factory
Nov 14, 2024
a1cc9f4
[tests] fileFormat factories add to paimon-flink-common
Nov 14, 2024
e401844
[core] resolve withBuckets commit
Nov 15, 2024
710af06
[format] no need call rowMapper under getArray
Nov 15, 2024
669dc30
[core] cancel manifest format factory cache for while .
Nov 18, 2024
00db1f6
Merge branch 'apache:master' into op_manifest
ranxianglei Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -32,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

/**
* Factory class which creates reader and writer factories for specific file format.
Expand Down Expand Up @@ -88,26 +88,15 @@ public static FileFormat fromIdentifier(String identifier, Options options) {

/** Create a {@link FileFormat} from format identifier and format options. */
public static FileFormat fromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormat.class.getClassLoader())
.orElseThrow(
() ->
new RuntimeException(
String.format(
"Could not find a FileFormatFactory implementation class for %s format",
identifier)));
}

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {
ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
if (identifier != null) {
identifier = identifier.toLowerCase();
}

return Optional.empty();
FileFormatFactory fileFormatFactory =
FactoryUtil.discoverFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just create a PR for FileFormatFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Of course you can, but I’ll change it in a few days. I’ve been a little busy lately.

当然可以,不过过几天再改,最近有点忙

FileFormatFactory.class.getClassLoader(),
FileFormatFactory.class,
identifier);
return fileFormatFactory.create(context);
}

protected Options getIdentifierPrefixOptions(Options options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.paimon.format;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

/** Factory to create {@link FileFormat}. */
public interface FileFormatFactory {
public interface FileFormatFactory extends Factory {

String identifier();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.function.Function;

/**
* Sample Object Cache Manager .
*
* @param <K>
* @param <V>
*/
public class ObjectCacheManager<K, V> {
private final Cache<K, V> cache;

private ObjectCacheManager(Duration timeout, int maxSize) {
this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build();
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
}

public static <K, V> ObjectCacheManager<K, V> newObjectCacheManager(
Duration timeout, int maxSize) {
return new ObjectCacheManager<>(timeout, maxSize);
}

public ObjectCacheManager<K, V> put(K k, V v) {
this.cache.put(k, v);
return this;
}

public V get(K k, Function<? super K, ? extends V> creator) {
return this.cache.get(k, creator);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE

private final DataFileMetaSerializer dataFileMetaSerializer;

private static final ManifestEntrySerializer MANIFEST_ENTRY_SERIALIZER =
new ManifestEntrySerializer();

public ManifestEntrySerializer() {
super(ManifestEntry.SCHEMA);
this.dataFileMetaSerializer = new DataFileMetaSerializer();
}

public static ManifestEntrySerializer getInstance() {
return MANIFEST_ENTRY_SERIALIZER;
}

@Override
public int getVersion() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsCollector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
Expand All @@ -39,7 +41,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

/**
* This file includes several {@link ManifestEntry}s, representing the additional changes since last
Expand Down Expand Up @@ -197,15 +201,60 @@ public boolean isCacheEnabled() {
}

public ManifestFile create() {
return create(null);
}

private static class FormatReaderFactoryKey {
private final RowType entryType;
private final List<Predicate> filters;
private final String formatIdentifier;

public FormatReaderFactoryKey(
String formatIdentifier, RowType entryType, List<Predicate> filters) {
this.entryType = entryType;
this.filters = filters;
this.formatIdentifier = formatIdentifier;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
FormatReaderFactoryKey that = (FormatReaderFactoryKey) o;
return Objects.equals(entryType, that.entryType)
&& Objects.equals(filters, that.filters)
&& Objects.equals(formatIdentifier, that.formatIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(entryType, filters, formatIdentifier);
}
}

private static final ObjectCacheManager<FormatReaderFactoryKey, FormatReaderFactory>
readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
private static final ObjectCacheManager<FormatReaderFactoryKey, FormatWriterFactory>
writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

public ManifestFile create(List<Predicate> filters) {
String formatIdentifier = this.fileFormat.getFormatIdentifier();
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
FormatReaderFactoryKey formatReaderFactoryKey =
new FormatReaderFactoryKey(formatIdentifier, entryType, filters);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
new ManifestEntrySerializer(),
ManifestEntrySerializer.getInstance(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
readers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createReaderFactory(entryType, filters)),
writers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createWriterFactory(entryType)),
compression,
pathFactory.manifestFileFactory(),
suggestedFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -81,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private Collection<Integer> buckets;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
Expand Down Expand Up @@ -128,6 +133,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
@Override
public FileStoreScan withBucket(int bucket) {
this.bucketFilter = i -> i == bucket;
this.buckets = Collections.singletonList(bucket);
return this;
}

@Override
public FileStoreScan withBuckets(Collection<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
}

Expand Down Expand Up @@ -379,7 +392,7 @@ protected TableSchema scanTableSchema(long id) {
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
.create(createPushDownFilter(buckets))
.read(
manifest.fileName(),
manifest.fileSize(),
Expand Down Expand Up @@ -426,6 +439,23 @@ private Filter<InternalRow> createCacheRowFilter() {
return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row));
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
*/
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the query performance mainly gain from the bucket field push down for the ORC manifest file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than half of the performance improvement comes from the orc pushdown of the manifest, the other part comes from the optimization of OrcFileFormat creation, and the other part comes from the caching of some time-consuming object operations on Scan.

性能提升一多半来自于manifest的orc下推,另外一部分来自于OrcFileFormat创建的优化,还有一部分来自于Scan上部分耗时的对象操作缓存 @Aitozi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with #4231 together, bucket data with orc pushdown . Tests see this issue #4586 , current orc impl is faster more than Parquet 10 times! . @Aitozi

if (buckets == null || buckets.isEmpty()) {
return null;
}
List<Predicate> predicates = new ArrayList<>();
PredicateBuilder predicateBuilder =
new PredicateBuilder(
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"}));
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets)));
return predicates;
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -57,6 +58,8 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(Collection<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -79,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public AbstractDataTableScan withBuckets(Collection<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}

@Override
public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -47,6 +49,14 @@ default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(Collection<Integer> buckets) {
throw new RuntimeException("not impl withBuckets for " + this.getClass().getName());
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,8 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(Collection<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
Expand Down
Loading
Loading