Skip to content

Commit

Permalink
fix temp table life cycle management
Browse files Browse the repository at this point in the history
  • Loading branch information
Yulei-Yang committed Dec 25, 2024
1 parent 58c5240 commit 7b70bea
Show file tree
Hide file tree
Showing 26 changed files with 543 additions and 69 deletions.
11 changes: 11 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,17 @@ public class Config extends ConfigBase {
// All frontends will get tablet stat from all backends at each interval
@ConfField public static int tablet_stat_update_interval_second = 60; // 1 min

// update interval of alive session
// Only master FE collect this info from all frontends at each interval
@ConfField public static int alive_session_update_interval_second = 5;

@ConfField public static int fe_session_mgr_threads_num = 1;

@ConfField public static int fe_session_mgr_blocking_queue_size = 1024;

@ConfField(mutable = true, masterOnly = true)
public static int loss_conn_fe_temp_table_keep_second = 60;

/**
* Max bytes a broker scanner can process in one broker load job.
* Commonly, each Backends has one broker scanner.
Expand Down
2 changes: 0 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
Env.getCurrentEnv().initialize(args);
Env.getCurrentEnv().waitForReady();

Env.getCurrentEnv().cleanPhantomTempTable();

// init and start:
// 1. HttpServer for HTTP Server
// 2. FeServer for Thrift Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ public void analyze(Analyzer analyzer) throws UserException {
if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
this.properties = maybeRewriteByAutoBucket(distributionDesc, properties);
}
if (isTemp && !engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
throw new AnalysisException("Temporary table should be OLAP table");
}

super.analyze(analyzer);
tableName.analyze(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void analyze(Analyzer analyzer) throws UserException {
.checkTblPriv(ConnectContext.get(), ctlName, dbName, tableName, PrivPredicate.SHOW)) {
if (Util.isTempTable(tableName)) {
if (Util.isTempTableInCurrentSession(tableName)) {
totalRows.add(Arrays.asList(Util.getTempTableOuterName(tableName),
totalRows.add(Arrays.asList(Util.getTempTableDisplayName(tableName),
String.valueOf(queryHit)));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand Down Expand Up @@ -442,14 +443,27 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
if (Config.ignore_backup_not_support_table_type) {
LOG.warn("Table '{}' is a {} table, can not backup and ignore it."
+ "Only OLAP(Doris)/ODBC/VIEW table can be backed up",
tblName, tbl.getType().toString());
tblName, tbl.isTemporary() ? "temporary" : tbl.getType().toString());
tblRefsNotSupport.add(tblRef);
continue;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_NOT_OLAP_TABLE, tblName);
}
}

if (tbl.isTemporary()) {
if (Config.ignore_backup_not_support_table_type) {
LOG.warn("Table '{}' is a temporary table, can not backup and ignore it."
+ "Only OLAP(Doris)/ODBC/VIEW table can be backed up",
Util.getTempTableDisplayName(tblName));
tblRefsNotSupport.add(tblRef);
continue;
} else {
ErrorReport.reportDdlException("Table " + Util.getTempTableDisplayName(tblName)
+ " is a temporary table, do not support backup");
}
}

OlapTable olapTbl = (OlapTable) tbl;
tbl.readLock();
try {
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,7 @@ public Pair<Boolean, Boolean> createTableWithLock(
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
nameToTable.put(table.getName(), table);
if (table.isTemporary()) {
if (isReplay) {
// add to to-deleted list, and delete it after catalog is ready
Env.getCurrentEnv().addPhantomTempTable(table);
} else {
ConnectContext.get().addTempTableToDB(table.getQualifiedDbName(), table.getName());
}
Env.getCurrentEnv().registerTempTableAndSession(table);
}

if (!isReplay) {
Expand Down Expand Up @@ -481,6 +476,9 @@ public void unregisterTable(String tableName) {
this.nameToTable.remove(tableName);
this.lowerCaseToTableName.remove(tableName.toLowerCase());
this.idToTable.remove(table.getId());
if (table.isTemporary()) {
Env.getCurrentEnv().unregisterTempTable(table);
}
table.markDropped();
}
}
Expand Down Expand Up @@ -1037,8 +1035,8 @@ public boolean updateDbProperties(Map<String, String> properties) throws DdlExce
try {
if (!olapTable.getBinlogConfig().isEnable()) {
String errMsg = String
.format("binlog is not enable in table[%s] in db [%s]", table.getName(),
getFullName());
.format("binlog is not enable in table[%s] in db [%s]",
Util.getTempTableDisplayName(table.getName()), getFullName());
throw new DdlException(errMsg);
}
} finally {
Expand Down
84 changes: 66 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ public class Env {

protected SystemInfoService systemInfo;
private HeartbeatMgr heartbeatMgr;
private FESessionMgr feSessionMgr;
private TemporaryTableMgr temporaryTableMgr;
// alive session of current fe
private Set<Long> aliveSessionSet;
private TabletInvertedIndex tabletInvertedIndex;
private ColocateTableIndex colocateTableIndex;

Expand Down Expand Up @@ -576,8 +580,10 @@ public class Env {

private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids);

// all sessions' last heartbeat time of all fe
private static volatile Map<Long, Long> sessionReportTimeMap = new HashMap<>();

private TokenManager tokenManager;
private List<Table> phantomTempTableList;

// if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it.
private final Map<String, Supplier<MasterDaemon>> configtoThreads = ImmutableMap
Expand Down Expand Up @@ -732,6 +738,9 @@ public Env(boolean isCheckpointCatalog) {

this.systemInfo = EnvFactory.getInstance().createSystemInfoService();
this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog);
this.feSessionMgr = new FESessionMgr();
this.temporaryTableMgr = new TemporaryTableMgr();
this.aliveSessionSet = new HashSet<>();
this.tabletInvertedIndex = new TabletInvertedIndex();
this.colocateTableIndex = new ColocateTableIndex();
this.recycleBin = new CatalogRecycleBin();
Expand Down Expand Up @@ -827,27 +836,40 @@ public Env(boolean isCheckpointCatalog) {
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
this.tokenManager = new TokenManager();
this.phantomTempTableList = new ArrayList<>();
}

public void addPhantomTempTable(Table table) {
phantomTempTableList.add(table);
public static Map<Long, Long> getSessionReportTimeMap() {
return sessionReportTimeMap;
}

public void removePhantomTempTable(Table table) {
phantomTempTableList.remove(table);
public void registerTempTableAndSession(Table table) {
if (ConnectContext.get() != null) {
ConnectContext.get().addTempTableToDB(table.getQualifiedDbName(), table.getName());
}

refreshSession(Util.getTempTableSessionId(table.getName()));
}

public void cleanPhantomTempTable() {
for (Table table : phantomTempTableList) {
try {
getInternalCatalog().dropTableWithoutCheck(getInternalCatalog().getDb(table.getDBName()).get(),
table, true);
} catch (DdlException e) {
LOG.error("drop temporary table error: db: {}, table: {}", table.getDBName(), table.getName(), e);
}
public void unregisterTempTable(Table table) {
if (ConnectContext.get() != null) {
ConnectContext.get().removeTempTableFromDB(table.getQualifiedDbName(), table.getName());
}
}

private void refreshSession(long sessionId) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}

public void checkAndRefreshSession(long sessionId) {
if (sessionReportTimeMap.containsKey(sessionId)) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}
}

public void refreshAllAliveSession() {
for (long sessionId : sessionReportTimeMap.keySet()) {
refreshSession(sessionId);
}
phantomTempTableList.clear();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -1294,8 +1316,8 @@ protected void getClusterIdAndRole() throws IOException {
// this loop will not end until we get certain role type and name
while (true) {
if (!getFeNodeTypeAndNameFromHelpers()) {
LOG.warn("current node is not added to the group. please add it first. "
+ "sleep 5 seconds and retry, current helper nodes: {}", helperNodes);
LOG.warn("current node {} is not added to the group. please add it first. "
+ "sleep 5 seconds and retry, current helper nodes: {}", selfNode, helperNodes);
try {
Thread.sleep(5000);
continue;
Expand Down Expand Up @@ -1724,6 +1746,11 @@ private void transferToMaster() {

toMasterProgress = "start daemon threads";

// coz current fe was not master fe and didn't get all fes' alive session report before, which cause
// sessionReportTimeMap is not up-to-date.
// reset all session's last heartbeat time. must run before init of TemporaryTableMgr
refreshAllAliveSession();

// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
Expand Down Expand Up @@ -1834,6 +1861,14 @@ protected void startMasterOnlyDaemonThreads() {
// heartbeat mgr
heartbeatMgr.setMaster(clusterId, token, epoch);
heartbeatMgr.start();

// alive session of all fes' mgr
feSessionMgr.setClusterId(clusterId);
feSessionMgr.setToken(token);
feSessionMgr.start();

temporaryTableMgr.start();

// New load scheduler
pendingLoadTaskScheduler.start();
loadingLoadTaskScheduler.start();
Expand Down Expand Up @@ -3650,6 +3685,7 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
if (olapTable.getEnableLightSchemaChange()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE).append("\" = \"");
sb.append(olapTable.getEnableLightSchemaChange()).append("\"");

}

// storage policy
Expand Down Expand Up @@ -3843,7 +3879,7 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis
sb.append("`").append(dbName).append("`.");
}
if (table.isTemporary()) {
sb.append("`").append(Util.getTempTableOuterName(table.getName())).append("`");
sb.append("`").append(Util.getTempTableDisplayName(table.getName())).append("`");
} else {
sb.append("`").append(table.getName()).append("`");
}
Expand Down Expand Up @@ -6806,5 +6842,17 @@ private void replayJournalsAndExit() {

System.exit(0);
}

public void registerSessionInfo(long sessionId) {
this.aliveSessionSet.add(sessionId);
}

public void unregisterSessionInfo(long sessionId) {
this.aliveSessionSet.remove(sessionId);
}

public List<Long> getAllAliveSessionIds() {
return new ArrayList<>(aliveSessionSet);
}
}

Loading

0 comments on commit 7b70bea

Please sign in to comment.