Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][format] Optimize manifest reading performance,add pushdown for manifest and orc.【此pr并未完全拆分并合,并非我有意愿关闭的,特重新打开】。 #4497

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9873457
[core] fix hll class not found
Aug 21, 2024
dedcaa4
[format][orc] open orc switch useSelected,allowSARGToFilter to make s…
Sep 22, 2024
4794d5f
Merge branch 'master' of github.com:ranxianglei/paimon
Sep 22, 2024
462edc6
[format][orc] miss tolerateMissingSchema
Sep 22, 2024
2226fb9
[format][orc] fix orc selected close for no filter condition
Sep 22, 2024
ee915c8
[orc] keep useSelected and allowSARGToFilter close default, or deleti…
Sep 27, 2024
2920fc9
[format][orc] VectorizedRowBatch to OrcColumnVector for selected rows…
Oct 15, 2024
8a89649
[format][orc] remove all isRepeating
Oct 15, 2024
c745e55
[core][format] merge with aa16c2bf1
Nov 11, 2024
7841f25
[core][format] merge conflicts
Nov 11, 2024
bbdd316
[core] fix AuditLogTable merge error
Nov 11, 2024
10ef09c
[format] recover HadoopFileIO
Nov 12, 2024
a2acbab
[format] checkstyle
Nov 12, 2024
e1c90c7
[format][orc] add pushdown option only for reader .
Nov 12, 2024
4364ac1
Merge branch 'master' into op_manifest
ranxianglei Nov 12, 2024
8c9a75c
[core] recover bucket
Nov 12, 2024
dfaeac3
Merge branch 'op_manifest' of github.com:ranxianglei/paimon into op_m…
Nov 12, 2024
f71d658
[core][format] add test for withBuckets and orcFormat
Nov 12, 2024
efac5b6
[format] fix checkstyle
Nov 12, 2024
15b1910
[format] fix version caused error
Nov 12, 2024
e1b3406
[core] fix checkstyle
Nov 12, 2024
d48fff6
[format] add FormatPerformanceTest
Nov 13, 2024
a0efae2
[format][tests] FormatPerformanceTest change to 10 times
Nov 13, 2024
016620c
[format][tests] FormatPerformanceTest change to lessthan to pass gith…
Nov 13, 2024
133a491
[format] merge conflicts
Nov 14, 2024
282a2c9
[format] id to lowercase
Nov 14, 2024
884d12f
[tests] core org.apache.paimon.factories.Factory
Nov 14, 2024
a1cc9f4
[tests] fileFormat factories add to paimon-flink-common
Nov 14, 2024
e401844
[core] resolve withBuckets commit
Nov 15, 2024
710af06
[format] no need call rowMapper under getArray
Nov 15, 2024
669dc30
[core] cancel manifest format factory cache for while .
Nov 18, 2024
2308475
[core] Optimization of Parquet Predicate Pushdown Capability (#4608)
Aiden-Dong Dec 3, 2024
db912e4
merge
Aiden-Dong Dec 3, 2024
c7a3776
merge conflicts
Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -41,6 +43,9 @@
*/
public abstract class FileFormat {

private static final ObjectCacheManager<String, FileFormatFactory> formatFactoryCache =
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

protected String formatIdentifier;

protected FileFormat(String formatIdentifier) {
Expand Down Expand Up @@ -99,9 +104,17 @@ public static FileFormat fromIdentifier(String identifier, FormatContext context

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {

FileFormatFactory fileFormatFactory =
formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase());
if (fileFormatFactory != null) {
return Optional.of(fileFormatFactory.create(context));
}

ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
formatFactoryCache.put(factory.identifier(), factory);
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.function.Function;

/**
* Sample Object Cache Manager .
*
* @param <K>
* @param <V>
*/
public class ObjectCacheManager<K, V> {
private final Cache<K, V> cache;

private ObjectCacheManager(Duration timeout, int maxSize) {
this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build();
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
}

public static <K, V> ObjectCacheManager<K, V> newObjectCacheManager(
Duration timeout, int maxSize) {
return new ObjectCacheManager<>(timeout, maxSize);
}

public ObjectCacheManager<K, V> put(K k, V v) {
this.cache.put(k, v);
return this;
}

public V get(K k, Function<? super K, ? extends V> creator) {
return this.cache.get(k, creator);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE

private final DataFileMetaSerializer dataFileMetaSerializer;

private static final ManifestEntrySerializer MANIFEST_ENTRY_SERIALIZER =
new ManifestEntrySerializer();

public ManifestEntrySerializer() {
super(ManifestEntry.SCHEMA);
this.dataFileMetaSerializer = new DataFileMetaSerializer();
}

public static ManifestEntrySerializer getInstance() {
return MANIFEST_ENTRY_SERIALIZER;
}

@Override
public int getVersion() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsCollector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
Expand All @@ -39,7 +41,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

/**
* This file includes several {@link ManifestEntry}s, representing the additional changes since last
Expand Down Expand Up @@ -197,15 +201,60 @@ public boolean isCacheEnabled() {
}

public ManifestFile create() {
return create(null);
}

private static class FormatReaderFactoryKey {
private final RowType entryType;
private final List<Predicate> filters;
private final String formatIdentifier;

public FormatReaderFactoryKey(
String formatIdentifier, RowType entryType, List<Predicate> filters) {
this.entryType = entryType;
this.filters = filters;
this.formatIdentifier = formatIdentifier;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
FormatReaderFactoryKey that = (FormatReaderFactoryKey) o;
return Objects.equals(entryType, that.entryType)
&& Objects.equals(filters, that.filters)
&& Objects.equals(formatIdentifier, that.formatIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(entryType, filters, formatIdentifier);
}
}

private static final ObjectCacheManager<FormatReaderFactoryKey, FormatReaderFactory>
readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
private static final ObjectCacheManager<FormatReaderFactoryKey, FormatWriterFactory>
writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

public ManifestFile create(List<Predicate> filters) {
String formatIdentifier = this.fileFormat.getFormatIdentifier();
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
FormatReaderFactoryKey formatReaderFactoryKey =
new FormatReaderFactoryKey(formatIdentifier, entryType, filters);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
new ManifestEntrySerializer(),
ManifestEntrySerializer.getInstance(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
readers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createReaderFactory(entryType, filters)),
writers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createWriterFactory(entryType)),
compression,
pathFactory.manifestFileFactory(),
suggestedFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -81,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<Integer> buckets;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
Expand Down Expand Up @@ -128,6 +133,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
@Override
public FileStoreScan withBucket(int bucket) {
this.bucketFilter = i -> i == bucket;
this.buckets = Collections.singletonList(bucket);
return this;
}

@Override
public FileStoreScan withBuckets(List<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
}

Expand Down Expand Up @@ -379,7 +392,7 @@ protected TableSchema scanTableSchema(long id) {
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
.create(createPushDownFilter(buckets))
.read(
manifest.fileName(),
manifest.fileSize(),
Expand Down Expand Up @@ -426,6 +439,23 @@ private Filter<InternalRow> createCacheRowFilter() {
return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row));
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
*/
private static List<Predicate> createPushDownFilter(List<Integer> buckets) {
if (buckets == null || buckets.isEmpty()) {
return null;
}
List<Predicate> predicates = new ArrayList<>();
PredicateBuilder predicateBuilder =
new PredicateBuilder(
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"}));
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets)));
return predicates;
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(List<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public AbstractDataTableScan withBuckets(List<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}

@Override
public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -47,6 +48,14 @@ default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(List<Integer> buckets) {
throw new RuntimeException("not impl withBuckets for " + this.getClass().getName());
ranxianglei marked this conversation as resolved.
Show resolved Hide resolved
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(List<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

public SnapshotReader withBuckets(List<Integer> buckets) {
scan.withBuckets(buckets);
return this;
}

@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
scan.withBucketFilter(bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

@Override
public SnapshotReader withBuckets(List<Integer> buckets) {
wrapped.withBuckets(buckets);
return this;
}

@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
wrapped.withBucketFilter(bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.paimon.memory.MemorySegmentSource;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -41,6 +44,7 @@
/** Cache records to {@link SegmentsCache} by compacted serializer. */
@ThreadSafe
public class ObjectsCache<K, V> {
protected static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class);

private final SegmentsCache<K> cache;
private final ObjectSerializer<V> projectedSerializer;
Expand Down Expand Up @@ -72,6 +76,9 @@ public List<V> read(
if (segments != null) {
return readFromSegments(segments, readFilter);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("not match cache key {}", key);
}
if (fileSize == null) {
fileSize = fileSizeFunction.apply(key);
}
Expand Down
Loading
Loading