Skip to content

Commit

Permalink
[Fix](Catalog)Standardize the use of authentication attributes in the…
Browse files Browse the repository at this point in the history
… Catalog to avoid creating a large number of Authenticator objects. (#46052)

[Fix](Catalog)Standardize the use of authentication attributes in the Catalog to avoid creating a large number of Authenticator objects.
  • Loading branch information
CalvinKirs authored Dec 27, 2024
1 parent 4389600 commit cddd290
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ public HMSExternalCatalog() {
* Default constructor for HMSExternalCatalog.
*/
public HMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
String comment) {
super(catalogId, name, InitCatalogLog.Type.HMS, comment);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
}

@Override
Expand Down Expand Up @@ -170,10 +168,11 @@ public void checkProperties() throws DdlException {

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
if (authenticator == null) {
this.preExecutionAuthenticator = new PreExecutionAuthenticator();
if (this.authenticator == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
this.authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
this.preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
}

HiveConf hiveConf = null;
Expand Down Expand Up @@ -263,7 +262,7 @@ public void registerDatabase(long dbId, String dbName) {
LOG.debug("create database [{}]", dbName);
}

ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, null, dbId, logType, false);
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, null, dbId, logType, false);
if (useMetaCache.get()) {
if (isInitialized()) {
metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
Expand All @@ -41,7 +40,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.thrift.TExprOpcode;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -824,10 +822,6 @@ public static Schema getHudiTableSchema(HMSExternalTable table) {
return hudiSchema;
}

public static <T> T ugiDoAs(long catalogId, PrivilegedExceptionAction<T> action) {
return ugiDoAs(((ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(),
action);
}

public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> action) {
// if hive config is not ready, then use hadoop kerberos to login
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand Down Expand Up @@ -264,22 +265,24 @@ public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> ta
return partitionValues;
}
String queryInstant = tableSnapshot.get().getTime().replaceAll("[-: ]", "");

partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getSnapshotPartitionValues(
hmsTable, hudiClient, queryInstant, useHiveSyncPartition));
try {
partitionValues = hmsTable.getCatalog().getPreExecutionAuthenticator().execute(() ->
processor.getSnapshotPartitionValues(hmsTable, hudiClient, queryInstant, useHiveSyncPartition));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
} else {
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
return partitionValues;
}
partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition));
try {
partitionValues = hmsTable.getCatalog().getPreExecutionAuthenticator().execute(()
-> processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
return partitionValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
Expand All @@ -51,6 +50,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -375,9 +375,12 @@ public List<Split> getSplits(int numBackends) throws UserException {
return getIncrementalSplits();
}
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient));
try {
prunedPartitions = hmsTable.getCatalog().getPreExecutionAuthenticator().execute(()
-> getPrunedPartitions(hudiClient));
} catch (Exception e) {
throw new UserException(ExceptionUtils.getRootCauseMessage(e), e);
}
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -437,9 +440,12 @@ public boolean isBatchMode() {
}
if (!partitionInit) {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient));
try {
prunedPartitions = hmsTable.getCatalog().getPreExecutionAuthenticator().execute(()
-> getPrunedPartitions(hudiClient));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.thrift.TIcebergMetadataParams;

import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -125,8 +125,13 @@ private Table loadTable(IcebergMetadataCacheKey key) {
} else {
throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table");
}
return HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) key.catalog).getConfiguration(),
() -> ops.loadTable(key.dbName, key.tableName));
try {
return ((ExternalCatalog) key.catalog).getPreExecutionAuthenticator().execute(()
-> ops.loadTable(key.dbName, key.tableName));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}

}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -581,25 +581,30 @@ public static List<Column> getSchema(ExternalCatalog catalog, String dbName, Str
* Get iceberg schema from catalog and convert them to doris schema
*/
public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name, long schemaId) {
return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> {
org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name);
Schema schema;
if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) {
schema = icebergTable.schema();
} else {
schema = icebergTable.schemas().get((int) schemaId);
}
Preconditions.checkNotNull(schema,
"Schema for table " + catalog.getName() + "." + dbName + "." + name + " is null");
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
return tmpSchema;
});
try {
return catalog.getPreExecutionAuthenticator().execute(() -> {
org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name);
Schema schema;
if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) {
schema = icebergTable.schema();
} else {
schema = icebergTable.schemas().get((int) schemaId);
}
Preconditions.checkNotNull(schema,
"Schema for table " + catalog.getName() + "." + dbName + "." + name + " is null");
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
return tmpSchema;
});
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
Expand All @@ -51,6 +50,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -183,8 +183,12 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli

@Override
public List<Split> getSplits(int numBackends) throws UserException {
return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(),
() -> doGetSplits(numBackends));
try {
return source.getCatalog().getPreExecutionAuthenticator().execute(() -> doGetSplits(numBackends));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}

}

private List<Split> doGetSplits(int numBackends) throws UserException {
Expand Down Expand Up @@ -250,7 +254,7 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column.");
}
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) {
TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
Expand Down

0 comments on commit cddd290

Please sign in to comment.