diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java index f2b38815a366fe..f163be11aa203f 100644 --- a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -20,6 +20,8 @@ import org.apache.doris.common.classloader.ThreadClassLoaderContext; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner { private final int fetchSize; private final ClassLoader classLoader; + private final PreExecutionAuthenticator preExecutionAuthenticator; + public HadoopHudiJniScanner(int fetchSize, Map params) { this.basePath = params.get("base_path"); this.dataFilePath = params.get("data_file_path"); @@ -120,6 +124,7 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); } } + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps); ZoneId zoneId; if (Strings.isNullOrEmpty(params.get("time_zone"))) { @@ -135,10 +140,14 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { @Override public void open() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - initRequiredColumnsAndTypes(); - initTableInfo(requiredTypes, requiredFields, fetchSize); - Properties properties = getReaderProperties(); - initReader(properties); + preExecutionAuthenticator.execute(() -> { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + return null; + }); + } catch (Exception e) { close(); LOG.warn("failed to open hadoop hudi jni scanner", e); @@ -149,25 +158,27 @@ public void open() throws IOException { @Override public int getNext() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - int numRows = 0; - for (; numRows < fetchSize; numRows++) { - if (!reader.next(key, value)) { - break; + return preExecutionAuthenticator.execute(() -> { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } } - Object rowData = deserializer.deserialize(value); - for (int i = 0; i < fields.length; i++) { - Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); - columnValue.setRow(fieldData); - // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", - // numRows, i, types[i].getName(), types[i].getType().name(), - // fieldInspectors[i].getTypeName()); - columnValue.setField(types[i], fieldInspectors[i]); - appendData(i, columnValue); - } - } - return numRows; + return numRows; + }); } catch (Exception e) { close(); LOG.warn("failed to get next in hadoop hudi jni scanner", e); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index c33e35cc6c908a..2c8d669452c4e8 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -20,13 +20,11 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.TableSchema; -import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; -import org.apache.hadoop.conf.Configuration; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; @@ -109,14 +107,7 @@ public PaimonJniScanner(int batchSize, Map params) { .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); - Configuration conf = new Configuration(); - hadoopOptionParams.forEach(conf::set); - preExecutionAuthenticator = new PreExecutionAuthenticator(); - AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, - AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); - HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); - preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java new file mode 100644 index 00000000000000..b33c4766ba7555 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations. + * This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration. + * It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds + * the maximum size (MAX_CACHE_SIZE). + *

+ * The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs), + * only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing + * redundant instantiations. + */ +public class PreExecutionAuthenticatorCache { + private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); + private static final int MAX_CACHE_SIZE = 100; + private static final Map preExecutionAuthenticatorCache = + new LinkedHashMap(MAX_CACHE_SIZE, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_CACHE_SIZE; + } + }; + + /** + * Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist. + * This method first checks if the configuration is already cached. If not, it computes a new instance and + * caches it for future use. + * + * @param hadoopConfig The Hadoop configuration (key-value pairs) + * @return A PreExecutionAuthenticator instance for the given configuration + */ + public static PreExecutionAuthenticator getAuthenticator(Map hadoopConfig) { + + HadoopConfigWrapper hadoopConfigWrapper = new HadoopConfigWrapper(hadoopConfig); + PreExecutionAuthenticator cachedAuthenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper); + if (cachedAuthenticator != null) { + return cachedAuthenticator; + } + return preExecutionAuthenticatorCache.computeIfAbsent(hadoopConfigWrapper, config -> { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(authenticator); + LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper); + return preExecutionAuthenticator; + }); + } + + + /** + * Hadoop configuration wrapper class that wraps a Map configuration. + * This class overrides the equals() and hashCode() methods to enable comparison of + * the configurations in the cache, ensuring that identical configurations (with the same key-value pairs) + * are considered equal and can reuse the same cached PreExecutionAuthenticator instance. + *

+ * The purpose of this class is to ensure that in the cache, if two configurations are identical + * (i.e., they have the same key-value pairs), only one instance of PreExecutionAuthenticator is created and cached. + * By implementing custom equals() and hashCode() methods, we ensure that even if different Map instances + * hold the same configuration data, they are considered equal in the cache. + */ + private static class HadoopConfigWrapper { + private final Map config; + + /** + * Constructor that takes a Map configuration. + * + * @param config The Hadoop configuration, typically a Map containing configuration key-value + * pairs + */ + public HadoopConfigWrapper(Map config) { + this.config = new HashMap<>(config); + } + + /** + * Checks if two HadoopConfigWrapper objects are equal. + * Two objects are considered equal if their wrapped Map configurations are identical + * (i.e., the key-value pairs are the same). + * + * @param obj The object to compare with the current object + * @return true if the two HadoopConfigWrapper objects have the same wrapped configuration; false otherwise + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + HadoopConfigWrapper that = (HadoopConfigWrapper) obj; + return config.equals(that.config); + } + + /** + * Generates a hash code based on the Hadoop configuration. + * Objects with the same configuration will generate the same hash code, ensuring + * that they can be correctly matched in a Map. + * + * @return The hash code of the Hadoop configuration + */ + @Override + public int hashCode() { + return config.hashCode(); + } + } +}