Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk committed Nov 19, 2024
1 parent 11a1435 commit 313cb95
Show file tree
Hide file tree
Showing 44 changed files with 548 additions and 559 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.OperationType;
Expand Down Expand Up @@ -687,7 +688,8 @@ public void registerExternalTableFromEvent(String dbName, String tableName,

db.writeLock();
try {
HMSExternalTable namedTable = new HMSExternalTable(tblId, tableName, dbName, (HMSExternalCatalog) catalog);
HMSExternalTable namedTable = ((HMSExternalDatabase) db)
.buildTableForInit(tableName, tblId, hmsCatalog, (HMSExternalDatabase) db);
namedTable.setUpdateTime(updateTime);
db.registerTable(namedTable);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
Expand Down Expand Up @@ -149,6 +150,7 @@ public abstract class ExternalCatalog

protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected IdentifierMapping identifierMapping;

public ExternalCatalog() {
}
Expand All @@ -171,6 +173,7 @@ public Configuration getConfiguration() {

/**
* set some default properties when creating catalog
*
* @return list of database names in this catalog
*/
protected List<String> listDatabaseNames() {
Expand All @@ -182,6 +185,10 @@ protected List<String> listDatabaseNames() {
}
}

public String fromRemoteDatabaseName(String remoteDatabaseName) {
return remoteDatabaseName;
}

// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
Expand Down Expand Up @@ -219,6 +226,10 @@ public void checkWhenCreating() throws DdlException {
*/
public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName);

public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) {
return remoteTableName;
}

/**
* init some local objects such as:
* hms client, read properties from hive-site.xml, es client
Expand Down Expand Up @@ -246,15 +257,29 @@ public final synchronized void makeSureInitialized() {
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
List<Pair<String, String>> remoteToLocalPairs = getFilteredDatabaseNames();

metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
dbName -> Optional.ofNullable(
buildDbForInit(dbName, Util.genIdByName(name, dbName), logType, true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
ignored -> remoteToLocalPairs.stream().map(Pair::value).collect(Collectors.toList()),
dbName -> {
Optional<Pair<String, String>> pairOpt = remoteToLocalPairs.stream()
.filter(pair -> pair.value().equals(dbName))
.findFirst();
if (pairOpt.isPresent()) {
Pair<String, String> pair = pairOpt.get();
String remoteDbName = pair.key();
return Optional.ofNullable(
buildDbForInit(remoteDbName, Util.genIdByName(name, dbName), logType,
true));
}
return Optional.empty();
},
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit))
);
}
setLastUpdateTime(System.currentTimeMillis());
} else {
Expand Down Expand Up @@ -350,21 +375,23 @@ private void init() {
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(logType);
List<String> filteredDatabases = getFilteredDatabaseNames();
for (String dbName : filteredDatabases) {
List<Pair<String, String>> remoteToLocalPairs = getFilteredDatabaseNames();
for (Pair<String, String> pair : remoteToLocalPairs) {
String remoteDbName = pair.key();
String localDbName = pair.value();
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
if (dbNameToId != null && dbNameToId.containsKey(localDbName)) {
dbId = dbNameToId.get(localDbName);
tmpDbNameToId.put(localDbName, dbId);
ExternalDatabase<? extends ExternalTable> db = idToDb.get(dbId);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType, false);
tmpDbNameToId.put(localDbName, dbId);
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(remoteDbName, dbId, logType, false);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
initCatalogLog.addCreateDb(dbId, localDbName, remoteDbName);
}
}

Expand All @@ -376,14 +403,18 @@ private void init() {
}

@NotNull
private List<String> getFilteredDatabaseNames() {
private List<Pair<String, String>> getFilteredDatabaseNames() {
List<String> allDatabases = Lists.newArrayList(listDatabaseNames());
allDatabases.remove(InfoSchemaDb.DATABASE_NAME);
allDatabases.add(InfoSchemaDb.DATABASE_NAME);
allDatabases.remove(MysqlDb.DATABASE_NAME);
allDatabases.add(MysqlDb.DATABASE_NAME);

Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();

List<Pair<String, String>> remoteToLocalPairs = Lists.newArrayList();

allDatabases = allDatabases.stream().filter(dbName -> {
if (!dbName.equals(InfoSchemaDb.DATABASE_NAME) && !dbName.equals(MysqlDb.DATABASE_NAME)) {
// Exclude database map take effect with higher priority over include database map
Expand All @@ -396,7 +427,13 @@ private List<String> getFilteredDatabaseNames() {
}
return true;
}).collect(Collectors.toList());
return allDatabases;

for (String remoteDbName : allDatabases) {
String localDbName = fromRemoteDatabaseName(remoteDbName);
remoteToLocalPairs.add(Pair.of(remoteDbName, localDbName));
}

return remoteToLocalPairs;
}

public void onRefresh(boolean invalidCache) {
Expand Down Expand Up @@ -468,6 +505,7 @@ public void setComment(String comment) {
/**
* Different from 'listDatabases()', this method will return dbnames from cache.
* while 'listDatabases()' will return dbnames from remote datasource.
*
* @return names of database in this catalog.
*/
@Override
Expand Down Expand Up @@ -624,8 +662,12 @@ private void removeAccessController() {
}

public void replayInitCatalog(InitCatalogLog log) {
if (log.getRemoteDbNames() == null || log.getRemoteDbNames().isEmpty()) {
initialized = false;
return;
}
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
Optional<ExternalDatabase<? extends ExternalTable>> db = getDbForReplay(log.getRefreshDbIds().get(i));
// Should not return null.
Expand Down Expand Up @@ -669,58 +711,59 @@ public Optional<ExternalDatabase<? extends ExternalTable>> getDbForReplay(long d
* Build a database instance.
* If checkExists is true, it will check if the database exists in the remote system.
*
* @param dbName
* @param remoteDbName
* @param dbId
* @param logType
* @param checkExists
* @return
*/
protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String dbName, long dbId,
protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String remoteDbName, long dbId,
InitCatalogLog.Type logType, boolean checkExists) {
// When running ut, disable this check to make ut pass.
// Because in ut, the database is not created in remote system.
if (checkExists && (!FeConstants.runningUnitTest || this instanceof TestExternalCatalog)) {
try {
List<String> dbNames = getDbNames();
if (!dbNames.contains(dbName)) {
dbNames = getFilteredDatabaseNames();
if (!dbNames.contains(dbName)) {
return null;
}
List<Pair<String, String>> remoteToLocalPairs = getFilteredDatabaseNames();
List<String> remoteDbNames = remoteToLocalPairs.stream()
.map(Pair::key)
.collect(Collectors.toList());
if (!remoteDbNames.contains(remoteDbName)) {
return null;
}
} catch (Throwable t) {
// If connection failed, it will throw exception.
// ignore it and treat it as not exist.
LOG.warn("Failed to check db {} exist in remote system, ignore it.", dbName, t);
LOG.warn("Failed to check db {} exist in remote system, ignore it.", remoteDbName, t);
return null;
}
}

if (dbName.equals(InfoSchemaDb.DATABASE_NAME)) {
if (remoteDbName.equals(InfoSchemaDb.DATABASE_NAME)) {
return new ExternalInfoSchemaDatabase(this, dbId);
}
if (dbName.equals(MysqlDb.DATABASE_NAME)) {
if (remoteDbName.equals(MysqlDb.DATABASE_NAME)) {
return new ExternalMysqlDatabase(this, dbId);
}
String localDbName = fromRemoteDatabaseName(remoteDbName);
switch (logType) {
case HMS:
return new HMSExternalDatabase(this, dbId, dbName);
return new HMSExternalDatabase(this, dbId, localDbName, remoteDbName);
case ES:
return new EsExternalDatabase(this, dbId, dbName);
return new EsExternalDatabase(this, dbId, localDbName, remoteDbName);
case JDBC:
return new JdbcExternalDatabase(this, dbId, dbName);
return new JdbcExternalDatabase(this, dbId, localDbName, remoteDbName);
case ICEBERG:
return new IcebergExternalDatabase(this, dbId, dbName);
return new IcebergExternalDatabase(this, dbId, localDbName, remoteDbName);
case MAX_COMPUTE:
return new MaxComputeExternalDatabase(this, dbId, dbName);
return new MaxComputeExternalDatabase(this, dbId, localDbName, remoteDbName);
case LAKESOUL:
return new LakeSoulExternalDatabase(this, dbId, dbName);
return new LakeSoulExternalDatabase(this, dbId, localDbName, remoteDbName);
case TEST:
return new TestExternalDatabase(this, dbId, dbName);
return new TestExternalDatabase(this, dbId, localDbName, remoteDbName);
case PAIMON:
return new PaimonExternalDatabase(this, dbId, dbName);
return new PaimonExternalDatabase(this, dbId, localDbName, remoteDbName);
case TRINO_CONNECTOR:
return new TrinoConnectorExternalDatabase(this, dbId, dbName);
return new TrinoConnectorExternalDatabase(this, dbId, localDbName, remoteDbName);
default:
break;
}
Expand Down
Loading

0 comments on commit 313cb95

Please sign in to comment.