Skip to content

Commit

Permalink
[core][format] merge with aa16c2bf1
Browse files Browse the repository at this point in the history
  • Loading branch information
ranxianglei authored and ranxianglei.rxl committed Nov 11, 2024
1 parent 8a89649 commit c745e55
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -40,6 +42,9 @@
*/
public abstract class FileFormat {

private static final ObjectCacheManager<String, FileFormatFactory> formatFactoryCache =
ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

protected String formatIdentifier;

protected FileFormat(String formatIdentifier) {
Expand Down Expand Up @@ -92,9 +97,17 @@ public static FileFormat fromIdentifier(String identifier, FormatContext context

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {

FileFormatFactory fileFormatFactory =
formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase());
if (fileFormatFactory != null) {
return Optional.of(fileFormatFactory.create(context));
}

ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
formatFactoryCache.put(factory.identifier(), factory);
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.function.Function;

/**
* Sample Object Cache Manager .
*
* @param <K>
* @param <V>
*/
public class ObjectCacheManager<K, V> {
private final Cache<K, V> cache;

private ObjectCacheManager(Duration timeout, int maxSize) {
this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build();
}

public static <K, V> ObjectCacheManager<K, V> newObjectCacheManager(
Duration timeout, int maxSize) {
return new ObjectCacheManager<>(timeout, maxSize);
}

public ObjectCacheManager<K, V> put(K k, V v) {
this.cache.put(k, v);
return this;
}

public V get(K k, Function<? super K, ? extends V> creator) {
return this.cache.get(k, creator);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ReflectionUtils;

import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -39,10 +38,7 @@
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/** Hadoop {@link FileIO}. */
Expand All @@ -52,8 +48,6 @@ public class HadoopFileIO implements FileIO {

protected SerializableConfiguration hadoopConf;

protected transient volatile Map<Pair<String, String>, FileSystem> fsMap;

@VisibleForTesting
public void setFileSystem(Path path, FileSystem fs) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down Expand Up @@ -149,26 +143,7 @@ private FileSystem getFileSystem(
org.apache.hadoop.fs.Path path,
FunctionWithException<org.apache.hadoop.fs.Path, FileSystem, IOException> creator)
throws IOException {
if (fsMap == null) {
synchronized (this) {
if (fsMap == null) {
fsMap = new ConcurrentHashMap<>();
}
}
}

Map<Pair<String, String>, FileSystem> map = fsMap;

URI uri = path.toUri();
String scheme = uri.getScheme();
String authority = uri.getAuthority();
Pair<String, String> key = Pair.of(scheme, authority);
FileSystem fs = map.get(key);
if (fs == null) {
fs = creator.apply(path);
map.put(key, fs);
}
return fs;
return creator.apply(path);
}

protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE

private final DataFileMetaSerializer dataFileMetaSerializer;

private static final ManifestEntrySerializer MANIFEST_ENTRY_SERIALIZER =
new ManifestEntrySerializer();

public ManifestEntrySerializer() {
super(ManifestEntry.SCHEMA);
this.dataFileMetaSerializer = new DataFileMetaSerializer();
}

public static ManifestEntrySerializer getInstance() {
return MANIFEST_ENTRY_SERIALIZER;
}

@Override
public int getVersion() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsCollector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
Expand All @@ -39,7 +41,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

/**
* This file includes several {@link ManifestEntry}s, representing the additional changes since last
Expand Down Expand Up @@ -197,15 +201,60 @@ public boolean isCacheEnabled() {
}

public ManifestFile create() {
return create(null);
}

private static class FormatReaderFactoryKey {
private final RowType entryType;
private final List<Predicate> filters;
private final String formatIdentifier;

public FormatReaderFactoryKey(
String formatIdentifier, RowType entryType, List<Predicate> filters) {
this.entryType = entryType;
this.filters = filters;
this.formatIdentifier = formatIdentifier;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
FormatReaderFactoryKey that = (FormatReaderFactoryKey) o;
return Objects.equals(entryType, that.entryType)
&& Objects.equals(filters, that.filters)
&& Objects.equals(formatIdentifier, that.formatIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(entryType, filters, formatIdentifier);
}
}

private static final ObjectCacheManager<FormatReaderFactoryKey, FormatReaderFactory>
readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);
private static final ObjectCacheManager<FormatReaderFactoryKey, FormatWriterFactory>
writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

public ManifestFile create(List<Predicate> filters) {
String formatIdentifier = this.fileFormat.getFormatIdentifier();
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
FormatReaderFactoryKey formatReaderFactoryKey =
new FormatReaderFactoryKey(formatIdentifier, entryType, filters);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
new ManifestEntrySerializer(),
ManifestEntrySerializer.getInstance(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
readers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createReaderFactory(entryType, filters)),
writers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createWriterFactory(entryType)),
compression,
pathFactory.manifestFileFactory(),
suggestedFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
Expand Down Expand Up @@ -82,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<Integer> buckets;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
Expand Down Expand Up @@ -136,6 +140,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
@Override
public FileStoreScan withBucket(int bucket) {
this.bucketFilter = i -> i == bucket;
this.buckets = Collections.singletonList(bucket);
return this;
}

@Override
public FileStoreScan withBuckets(List<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
}

Expand Down Expand Up @@ -416,7 +428,7 @@ private boolean filterMergedManifestEntry(ManifestEntry entry) {
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
.create(createPushDownFilter(buckets, numOfBuckets))
.read(
manifest.fileName(),
manifest.fileSize(),
Expand Down Expand Up @@ -480,6 +492,23 @@ private static Filter<InternalRow> createCacheRowFilter(
};
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
*/
private static List<Predicate> createPushDownFilter(List<Integer> buckets, int numOfBuckets) {
if (buckets == null || buckets.isEmpty()) {
return null;
}
List<Predicate> predicates = new ArrayList<>();
PredicateBuilder predicateBuilder =
new PredicateBuilder(
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"}));
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets)));
return predicates;
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(List<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public AbstractDataTableScan withBuckets(List<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}

@Override
public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -47,6 +48,14 @@ default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(List<Integer> buckets) {
throw new RuntimeException("not impl withBuckets for " + this.getClass().getName());
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(List<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
Expand Down
Loading

0 comments on commit c745e55

Please sign in to comment.