Skip to content

Commit

Permalink
add cache
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Jan 24, 2025
1 parent b2b35eb commit e5271e3
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.common.classloader.ThreadClassLoaderContext;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner {
private final int fetchSize;
private final ClassLoader classLoader;

private final PreExecutionAuthenticator preExecutionAuthenticator;

public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
this.basePath = params.get("base_path");
this.dataFilePath = params.get("data_file_path");
Expand Down Expand Up @@ -120,6 +124,7 @@ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue());
}
}
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps);

ZoneId zoneId;
if (Strings.isNullOrEmpty(params.get("time_zone"))) {
Expand All @@ -135,10 +140,14 @@ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
@Override
public void open() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
preExecutionAuthenticator.execute(() -> {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
return null;
});

} catch (Exception e) {
close();
LOG.warn("failed to open hadoop hudi jni scanner", e);
Expand All @@ -149,25 +158,27 @@ public void open() throws IOException {
@Override
public int getNext() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
return preExecutionAuthenticator.execute(() -> {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
return numRows;
return numRows;
});
} catch (Exception e) {
close();
LOG.warn("failed to get next in hadoop hudi jni scanner", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;

import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
Expand Down Expand Up @@ -109,14 +107,7 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
Configuration conf = new Configuration();
hadoopOptionParams.forEach(conf::set);
preExecutionAuthenticator = new PreExecutionAuthenticator();
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf,
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations.
* This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration.
* It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds
* the maximum size (MAX_CACHE_SIZE).
* <p>
* The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs),
* only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing
* redundant instantiations.
*/
public class PreExecutionAuthenticatorCache {
private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class);
private static final int MAX_CACHE_SIZE = 100;
private static final Map<HadoopConfigWrapper, PreExecutionAuthenticator> preExecutionAuthenticatorCache =
new LinkedHashMap<HadoopConfigWrapper, PreExecutionAuthenticator>(MAX_CACHE_SIZE, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<HadoopConfigWrapper, PreExecutionAuthenticator> eldest) {
return size() > MAX_CACHE_SIZE;
}
};

/**
* Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist.
* This method first checks if the configuration is already cached. If not, it computes a new instance and
* caches it for future use.
*
* @param hadoopConfig The Hadoop configuration (key-value pairs)
* @return A PreExecutionAuthenticator instance for the given configuration
*/
public static PreExecutionAuthenticator getAuthenticator(Map<String, String> hadoopConfig) {

HadoopConfigWrapper hadoopConfigWrapper = new HadoopConfigWrapper(hadoopConfig);
PreExecutionAuthenticator cachedAuthenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper);
if (cachedAuthenticator != null) {
return cachedAuthenticator;
}
return preExecutionAuthenticatorCache.computeIfAbsent(hadoopConfigWrapper, config -> {
Configuration conf = new Configuration();
hadoopConfig.forEach(conf::set);
PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator();
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf,
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper);
return preExecutionAuthenticator;
});
}


/**
* Hadoop configuration wrapper class that wraps a Map<String, String> configuration.
* This class overrides the equals() and hashCode() methods to enable comparison of
* the configurations in the cache, ensuring that identical configurations (with the same key-value pairs)
* are considered equal and can reuse the same cached PreExecutionAuthenticator instance.
* <p>
* The purpose of this class is to ensure that in the cache, if two configurations are identical
* (i.e., they have the same key-value pairs), only one instance of PreExecutionAuthenticator is created and cached.
* By implementing custom equals() and hashCode() methods, we ensure that even if different Map instances
* hold the same configuration data, they are considered equal in the cache.
*/
private static class HadoopConfigWrapper {
private final Map<String, String> config;

/**
* Constructor that takes a Map<String, String> configuration.
*
* @param config The Hadoop configuration, typically a Map<String, String> containing configuration key-value
* pairs
*/
public HadoopConfigWrapper(Map<String, String> config) {
this.config = new HashMap<>(config);
}

/**
* Checks if two HadoopConfigWrapper objects are equal.
* Two objects are considered equal if their wrapped Map configurations are identical
* (i.e., the key-value pairs are the same).
*
* @param obj The object to compare with the current object
* @return true if the two HadoopConfigWrapper objects have the same wrapped configuration; false otherwise
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
HadoopConfigWrapper that = (HadoopConfigWrapper) obj;
return config.equals(that.config);
}

/**
* Generates a hash code based on the Hadoop configuration.
* Objects with the same configuration will generate the same hash code, ensuring
* that they can be correctly matched in a Map.
*
* @return The hash code of the Hadoop configuration
*/
@Override
public int hashCode() {
return config.hashCode();
}
}
}

0 comments on commit e5271e3

Please sign in to comment.