From c745e557b909ec28c3d6d15fc67b4ffc1fd2c9e1 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Mon, 11 Nov 2024 10:01:10 +0000 Subject: [PATCH] [core][format] merge with aa16c2bf1 --- .../org/apache/paimon/format/FileFormat.java | 13 +++++ .../apache/paimon/fs/ObjectCacheManager.java | 57 +++++++++++++++++++ .../apache/paimon/fs/hadoop/HadoopFileIO.java | 27 +-------- .../manifest/ManifestEntrySerializer.java | 7 +++ .../apache/paimon/manifest/ManifestFile.java | 55 +++++++++++++++++- .../operation/AbstractFileStoreScan.java | 31 +++++++++- .../paimon/operation/FileStoreScan.java | 2 + .../table/source/AbstractDataTableScan.java | 6 ++ .../paimon/table/source/InnerTableScan.java | 9 +++ .../table/source/snapshot/SnapshotReader.java | 2 + .../source/snapshot/SnapshotReaderImpl.java | 13 ++++- .../paimon/table/system/AuditLogTable.java | 6 ++ .../org/apache/paimon/utils/ObjectsCache.java | 7 +++ .../paimon/format/orc/OrcFileFormat.java | 28 +++++++-- .../format/orc/reader/OrcRowColumnVector.java | 2 +- 15 files changed, 228 insertions(+), 37 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 78dfe1c0cad1..732f1f01e1a7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -40,6 +42,9 @@ */ public abstract class FileFormat { + private static final ObjectCacheManager formatFactoryCache = + ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -92,9 +97,17 @@ public static FileFormat fromIdentifier(String identifier, FormatContext context private static Optional fromIdentifier( String formatIdentifier, FormatContext context, ClassLoader classLoader) { + + FileFormatFactory fileFormatFactory = + formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase()); + if (fileFormatFactory != null) { + return Optional.of(fileFormatFactory.create(context)); + } + ServiceLoader serviceLoader = ServiceLoader.load(FileFormatFactory.class, classLoader); for (FileFormatFactory factory : serviceLoader) { + formatFactoryCache.put(factory.identifier(), factory); if (factory.identifier().equals(formatIdentifier.toLowerCase())) { return Optional.of(factory.create(context)); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java new file mode 100644 index 000000000000..f157578107eb --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.function.Function; + +/** + * Sample Object Cache Manager . + * + * @param + * @param + */ +public class ObjectCacheManager { + private final Cache cache; + + private ObjectCacheManager(Duration timeout, int maxSize) { + this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); + } + + public static ObjectCacheManager newObjectCacheManager( + Duration timeout, int maxSize) { + return new ObjectCacheManager<>(timeout, maxSize); + } + + public ObjectCacheManager put(K k, V v) { + this.cache.put(k, v); + return this; + } + + public V get(K k, Function creator) { + return this.cache.get(k, creator); + } + + public V getIfPresent(K k) { + return this.cache.getIfPresent(k); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index 70325ee69635..6b8104f1b6da 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -27,7 +27,6 @@ import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.utils.FunctionWithException; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ReflectionUtils; import org.apache.hadoop.fs.FSDataInputStream; @@ -39,10 +38,7 @@ import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URI; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; /** Hadoop {@link FileIO}. */ @@ -52,8 +48,6 @@ public class HadoopFileIO implements FileIO { protected SerializableConfiguration hadoopConf; - protected transient volatile Map, FileSystem> fsMap; - @VisibleForTesting public void setFileSystem(Path path, FileSystem fs) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); @@ -149,26 +143,7 @@ private FileSystem getFileSystem( org.apache.hadoop.fs.Path path, FunctionWithException creator) throws IOException { - if (fsMap == null) { - synchronized (this) { - if (fsMap == null) { - fsMap = new ConcurrentHashMap<>(); - } - } - } - - Map, FileSystem> map = fsMap; - - URI uri = path.toUri(); - String scheme = uri.getScheme(); - String authority = uri.getAuthority(); - Pair key = Pair.of(scheme, authority); - FileSystem fs = map.get(key); - if (fs == null) { - fs = creator.apply(path); - map.put(key, fs); - } - return fs; + return creator.apply(path); } protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index 2c3ba2aeaab3..9bee11b64bbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer filters; + private final String formatIdentifier; + + public FormatReaderFactoryKey( + String formatIdentifier, RowType entryType, List filters) { + this.entryType = entryType; + this.filters = filters; + this.formatIdentifier = formatIdentifier; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + FormatReaderFactoryKey that = (FormatReaderFactoryKey) o; + return Objects.equals(entryType, that.entryType) + && Objects.equals(filters, that.filters) + && Objects.equals(formatIdentifier, that.formatIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(entryType, filters, formatIdentifier); + } + } + + private static final ObjectCacheManager + readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + private static final ObjectCacheManager + writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + + public ManifestFile create(List filters) { + String formatIdentifier = this.fileFormat.getFormatIdentifier(); RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); + FormatReaderFactoryKey formatReaderFactoryKey = + new FormatReaderFactoryKey(formatIdentifier, entryType, filters); return new ManifestFile( fileIO, schemaManager, partitionType, - new ManifestEntrySerializer(), + ManifestEntrySerializer.getInstance(), entryType, - fileFormat.createReaderFactory(entryType), - fileFormat.createWriterFactory(entryType), + readers.get( + formatReaderFactoryKey, + (ignore) -> fileFormat.createReaderFactory(entryType, filters)), + writers.get( + formatReaderFactoryKey, + (ignore) -> fileFormat.createWriterFactory(entryType)), compression, pathFactory.manifestFileFactory(), suggestedFileSize, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index d043932810c3..394eb0b3ffa4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -35,9 +35,12 @@ import org.apache.paimon.operation.metrics.ScanStats; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; @@ -82,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; + private List buckets; private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; @@ -136,6 +140,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { @Override public FileStoreScan withBucket(int bucket) { this.bucketFilter = i -> i == bucket; + this.buckets = Collections.singletonList(bucket); + return this; + } + + @Override + public FileStoreScan withBuckets(List buckets) { + this.bucketFilter = buckets::contains; + this.buckets = buckets; return this; } @@ -416,7 +428,7 @@ private boolean filterMergedManifestEntry(ManifestEntry entry) { public List readManifest(ManifestFileMeta manifest) { List entries = manifestFileFactory - .create() + .create(createPushDownFilter(buckets, numOfBuckets)) .read( manifest.fileName(), manifest.fileSize(), @@ -480,6 +492,23 @@ private static Filter createCacheRowFilter( }; } + /** + * Read the corresponding entries based on the current required partition and bucket. + * + *

Implemented to {@link InternalRow} is for performance (No deserialization). + */ + private static List createPushDownFilter(List buckets, int numOfBuckets) { + if (buckets == null || buckets.isEmpty()) { + return null; + } + List predicates = new ArrayList<>(); + PredicateBuilder predicateBuilder = + new PredicateBuilder( + RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"})); + predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets))); + return predicates; + } + /** * Read the corresponding entries based on the current required partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 197b4dff4962..4f3cd6e86463 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -55,6 +55,8 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); + FileStoreScan withBuckets(List buckets); + FileStoreScan withBucketFilter(Filter bucketFilter); FileStoreScan withPartitionBucket(BinaryRow partition, int bucket); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index ba1bc6588f68..1e292de12a40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -80,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public AbstractDataTableScan withBuckets(List buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public AbstractDataTableScan withPartitionFilter(Map partitionSpec) { snapshotReader.withPartitionFilter(partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 00a4fc0cde18..5dfd83640f5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,6 +48,14 @@ default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + default InnerTableScan withBucket(Integer bucket) { + return withBuckets(Collections.singletonList(bucket)); + } + + default InnerTableScan withBuckets(List buckets) { + throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); + } + default InnerTableScan withLevelFilter(Filter levelFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index bd07f49fda01..ec53fdfb1b88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -80,6 +80,8 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); + SnapshotReader withBuckets(List buckets); + SnapshotReader withBucketFilter(Filter bucketFilter); SnapshotReader withDataFileNameFilter(Filter fileNameFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index b2fd1c7850f5..2cf4164f7593 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -245,6 +245,11 @@ public SnapshotReader withBucket(int bucket) { return this; } + public SnapshotReader withBuckets(List buckets) { + scan.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { scan.withBucketFilter(bucketFilter); @@ -271,7 +276,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + List buckets = new ArrayList<>(); + for (int bucket = 0; bucket < numberOfParallelSubtasks; bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index d9cf80289953..92556dd51704 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -334,6 +334,12 @@ public SnapshotReader withBucket(int bucket) { return this; } + @Override + public SnapshotReader withBuckets(List buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { snapshotReader.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 8c490e008baa..ff1c76cb0a03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -28,6 +28,9 @@ import org.apache.paimon.memory.MemorySegmentSource; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -41,6 +44,7 @@ /** Cache records to {@link SegmentsCache} by compacted serializer. */ @ThreadSafe public class ObjectsCache { + protected static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class); private final SegmentsCache cache; private final ObjectSerializer projectedSerializer; @@ -72,6 +76,9 @@ public List read( if (segments != null) { return readFromSegments(segments, readFilter); } else { + if (LOG.isDebugEnabled()) { + LOG.debug("not match cache key {}", key); + } if (fileSize == null) { fileSize = fileSizeFunction.apply(key); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index c2db03aa16ed..e2e2c057e1a6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -31,6 +31,7 @@ import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; +import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -44,12 +45,14 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.hadoop.conf.Configuration; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -70,13 +73,29 @@ public class OrcFileFormat extends FileFormat { private final int readBatchSize; private final int writeBatchSize; + private static final org.apache.hadoop.conf.Configuration emptyConf = + new org.apache.hadoop.conf.Configuration(); + private static final ObjectCacheManager configCache = + ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + + static { + emptyConf.set("paimon.empty.configuration", "paimon.empty.configuration"); + } + public OrcFileFormat(FormatContext formatContext) { super(IDENTIFIER); this.orcProperties = getOrcProperties(formatContext.formatOptions(), formatContext); - this.readerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString())); - this.writerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), v.toString())); + Configuration conf; + Configuration cachedConf = configCache.getIfPresent(orcProperties); + if (cachedConf != null) { + conf = cachedConf; + } else { + conf = new org.apache.hadoop.conf.Configuration(emptyConf); + this.orcProperties.forEach((k, v) -> conf.set(k.toString(), v.toString())); + configCache.put(orcProperties, conf); + } + this.readerConf = conf; + this.writerConf = conf; this.readBatchSize = formatContext.readBatchSize(); this.writeBatchSize = formatContext.writeBatchSize(); } @@ -146,7 +165,6 @@ public FormatWriterFactory createWriterFactory(RowType type) { private static Properties getOrcProperties(Options options, FormatContext formatContext) { Properties orcProperties = new Properties(); - Properties properties = new Properties(); options.addAllToProperties(properties); properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index 6c73c9fdbe0d..b925f5b536b1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -46,7 +46,7 @@ public OrcRowColumnVector( @Override public ColumnarRow getRow(int i) { - i = rowMapper(i); + // no need to call rowMapper here . return new ColumnarRow(batch, i); }