Skip to content

Commit

Permalink
[core] Optimize fileFormat discovery and avoid creating fileFormat (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Dec 26, 2024
1 parent 8be4b1f commit 2e57c59
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,10 @@ public FileFormat fileFormat() {
return createFileFormat(options, FILE_FORMAT);
}

public String fileFormatString() {
return normalizeFileFormat(options.get(FILE_FORMAT));
}

public FileFormat manifestFormat() {
return createFileFormat(options, MANIFEST_FORMAT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,21 @@ public static <T extends Factory> List<String> discoverIdentifiers(
}

private static List<Factory> getFactories(ClassLoader classLoader) {
return FACTORIES.get(classLoader, FactoryUtil::discoverFactories);
return FACTORIES.get(classLoader, s -> discoverFactories(classLoader, Factory.class));
}

private static List<Factory> discoverFactories(ClassLoader classLoader) {
final Iterator<Factory> serviceLoaderIterator =
ServiceLoader.load(Factory.class, classLoader).iterator();

final List<Factory> loadResults = new ArrayList<>();
/**
* Discover factories.
*
* @param classLoader the class loader
* @param klass the klass
* @param <T> the type of the factory
* @return the list of factories
*/
public static <T> List<T> discoverFactories(ClassLoader classLoader, Class<T> klass) {
final Iterator<T> serviceLoaderIterator = ServiceLoader.load(klass, classLoader).iterator();

final List<T> loadResults = new ArrayList<>();
while (true) {
try {
// error handling should also be applied to the hasNext() call because service
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.factories;

import org.apache.paimon.format.FileFormatFactory;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.factories.FactoryUtil.discoverFactories;

/** Utility for working with {@link FileFormatFactory}s. */
public class FormatFactoryUtil {

private static final Cache<ClassLoader, List<FileFormatFactory>> FACTORIES =
Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build();

/** Discovers a file format factory. */
@SuppressWarnings("unchecked")
public static <T extends FileFormatFactory> T discoverFactory(
ClassLoader classLoader, String identifier) {
final List<FileFormatFactory> foundFactories = getFactories(classLoader);

final List<FileFormatFactory> matchingFactories =
foundFactories.stream()
.filter(f -> f.identifier().equals(identifier))
.collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new FactoryException(
String.format(
"Could not find any factory for identifier '%s' that implements FileFormatFactory in the classpath.\n\n"
+ "Available factory identifiers are:\n\n"
+ "%s",
identifier,
foundFactories.stream()
.map(FileFormatFactory::identifier)
.collect(Collectors.joining("\n"))));
}

return (T) matchingFactories.get(0);
}

private static List<FileFormatFactory> getFactories(ClassLoader classLoader) {
return FACTORIES.get(
classLoader, s -> discoverFactories(classLoader, FileFormatFactory.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.factories.FormatFactoryUtil;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -32,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

/**
* Factory class which creates reader and writer factories for specific file format.
Expand Down Expand Up @@ -88,26 +88,9 @@ public static FileFormat fromIdentifier(String identifier, Options options) {

/** Create a {@link FileFormat} from format identifier and format options. */
public static FileFormat fromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormat.class.getClassLoader())
.orElseThrow(
() ->
new RuntimeException(
String.format(
"Could not find a FileFormatFactory implementation class for %s format",
identifier)));
}

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {
ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
}

return Optional.empty();
return FormatFactoryUtil.discoverFactory(
FileFormat.class.getClassLoader(), identifier.toLowerCase())
.create(context);
}

protected Options getIdentifierPrefixOptions(Options options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected AbstractFileStore(

@Override
public FileStorePathFactory pathFactory() {
return pathFactory(options.fileFormat().getFormatIdentifier());
return pathFactory(options.fileFormatString());
}

protected FileStorePathFactory pathFactory(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
private Map<String, FileStorePathFactory> format2PathFactory() {
Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values());
formats.add(options.fileFormat().getFormatIdentifier());
formats.add(options.fileFormatString());
formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format)));
return pathFactoryMap;
}
Expand Down

0 comments on commit 2e57c59

Please sign in to comment.