diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index ab75f0348584..83c929e94cec 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -19,8 +19,8 @@ package org.apache.paimon.format; import org.apache.paimon.CoreOptions; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.format.FileFormatFactory.FormatContext; -import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -28,13 +28,11 @@ import javax.annotation.Nullable; -import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.ServiceLoader; /** * Factory class which creates reader and writer factories for specific file format. @@ -43,9 +41,6 @@ */ public abstract class FileFormat { - private static final ObjectCacheManager formatFactoryCache = - ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); - protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -93,34 +88,12 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return fromIdentifier(identifier, context, FileFormat.class.getClassLoader()) - .orElseThrow( - () -> - new RuntimeException( - String.format( - "Could not find a FileFormatFactory implementation class for %s format", - identifier))); - } - - private static Optional fromIdentifier( - String formatIdentifier, FormatContext context, ClassLoader classLoader) { - FileFormatFactory fileFormatFactory = - formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase()); - if (fileFormatFactory != null) { - return Optional.of(fileFormatFactory.create(context)); - } - - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, classLoader); - for (FileFormatFactory factory : serviceLoader) { - formatFactoryCache.put(factory.identifier(), factory); - if (factory.identifier().equals(formatIdentifier.toLowerCase())) { - return Optional.of(factory.create(context)); - } - } - - return Optional.empty(); + FactoryUtil.discoverFactory( + FileFormatFactory.class.getClassLoader(), + FileFormatFactory.class, + identifier); + return fileFormatFactory.create(context); } protected Options getIdentifierPrefixOptions(Options options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 99e8b411fbaf..deb6fadd7c19 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -85,7 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; - private List buckets; + private Collection buckets; private BiFilter totalAwareBucketFilter = null; private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; @@ -138,7 +138,7 @@ public FileStoreScan withBucket(int bucket) { } @Override - public FileStoreScan withBuckets(List buckets) { + public FileStoreScan withBuckets(Collection buckets) { this.bucketFilter = buckets::contains; this.buckets = buckets; return this; @@ -444,7 +444,7 @@ private Filter createCacheRowFilter() { * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static List createPushDownFilter(List buckets) { + private static List createPushDownFilter(Collection buckets) { if (buckets == null || buckets.isEmpty()) { return null; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 65af1e6fe259..89e7e7aace90 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,7 +58,7 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); - FileStoreScan withBuckets(List buckets); + FileStoreScan withBuckets(Collection buckets); FileStoreScan withBucketFilter(Filter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 4b071ba617ab..9299b86aa420 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -48,6 +48,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -80,7 +81,7 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { } @Override - public AbstractDataTableScan withBuckets(List buckets) { + public AbstractDataTableScan withBuckets(Collection buckets) { snapshotReader.withBuckets(buckets); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 5dfd83640f5d..6c3b28f0f4ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -52,7 +53,7 @@ default InnerTableScan withBucket(Integer bucket) { return withBuckets(Collections.singletonList(bucket)); } - default InnerTableScan withBuckets(List buckets) { + default InnerTableScan withBuckets(Collection buckets) { throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 76a550ba4ce6..026e529dc0c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,7 +82,7 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); - SnapshotReader withBuckets(List buckets); + SnapshotReader withBuckets(Collection buckets); SnapshotReader withBucketFilter(Filter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 74f257ddcf9d..c3f027da962f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -53,6 +53,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -246,7 +247,7 @@ public SnapshotReader withBucket(int bucket) { return this; } - public SnapshotReader withBuckets(List buckets) { + public SnapshotReader withBuckets(Collection buckets) { scan.withBuckets(buckets); return this; } @@ -277,7 +278,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + Set buckets = new HashSet<>(); + for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index a11e88074932..2e1d913648e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -331,7 +332,7 @@ public SnapshotReader withBucket(int bucket) { } @Override - public SnapshotReader withBuckets(List buckets) { + public SnapshotReader withBuckets(Collection buckets) { wrapped.withBuckets(buckets); return this; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 1d04d2f58f02..07b2fc48e078 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -235,7 +235,8 @@ public void testWithBuckets() throws Exception { int wantedBucket1 = random.nextInt(NUM_BUCKETS); int wantedBucket2 = random.nextInt(NUM_BUCKETS); int wantedBucket3 = random.nextInt(NUM_BUCKETS); - List buckets = Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3); + Set buckets = + new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3)); FileStoreScan scan = store.newScan(); scan.withSnapshot(snapshot.id()); diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 7af6f79b3493..000000000000 --- a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.format.avro.AvroFileFormatFactory -org.apache.paimon.format.orc.OrcFileFormatFactory -org.apache.paimon.format.parquet.ParquetFileFormatFactory diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java deleted file mode 100644 index cd72bcc53dd4..000000000000 --- a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.format; - -import org.apache.paimon.factories.FactoryUtil; -import org.apache.paimon.options.Options; - -import org.junit.Test; - -import java.util.ServiceLoader; - -import static org.assertj.core.api.Assertions.assertThat; - -/** test format performance. */ -public class FormatPerformanceTest { - - @Test - public void testFormatPerformance() { - double objectCacheCost = newFileFormatWithObjectCache(); - double serviceCost = newFileFormatWithServiceLoader(); - double factoryUtilCost = newFileFormatWithFactoryUtil(); - assertThat(objectCacheCost).isLessThan(serviceCost); - assertThat(objectCacheCost).isLessThan(factoryUtilCost); - } - - private double newFileFormatWithServiceLoader() { - for (int i = 0; i < 1000; i++) { - loadFromIdentifier(); - } - int times = 10_000; - long start = System.nanoTime(); - int nothing = 0; - for (int i = 0; i < times; i++) { - nothing += loadFromIdentifier(); - } - nothing = 0; - return ((double) (System.nanoTime() - start)) / 1000_000 / times + nothing; - } - - private double newFileFormatWithObjectCache() { - for (int i = 0; i < 1000; i++) { - newFileFormat(); - } - int times = 10_000; - long start = System.nanoTime(); - for (int i = 0; i < times; i++) { - newFileFormat(); - } - return ((double) (System.nanoTime() - start)) / 1000_000 / times; - } - - private double newFileFormatWithFactoryUtil() { - for (int i = 0; i < 1000; i++) { - newFileFormatFromFactoryUtil(); - } - int times = 10_000; - long start = System.nanoTime(); - for (int i = 0; i < times; i++) { - newFileFormatFromFactoryUtil(); - } - return ((double) (System.nanoTime() - start)) / 1000_000 / times; - } - - private int loadFromIdentifier() { - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, FileFormat.class.getClassLoader()); - int i = 0; - for (FileFormatFactory factory : serviceLoader) { - i++; - } - return i; - } - - private FileFormat newFileFormat() { - FileFormat orc = FileFormat.fromIdentifier("orc", new Options()); - return orc; - } - - @Test - public void newFileFormatFromFactoryUtil() { - FileFormatFactory fileFormatFactory = - FactoryUtil.discoverFactory( - FileFormatFactory.class.getClassLoader(), FileFormatFactory.class, "orc"); - } -}