diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7128bfed900b16b..ce2d4f064f578d3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -941,6 +941,17 @@ public class Config extends ConfigBase { // All frontends will get tablet stat from all backends at each interval @ConfField public static int tablet_stat_update_interval_second = 60; // 1 min + // update interval of alive session + // Only master FE collect this info from all frontends at each interval + @ConfField public static int alive_session_update_interval_second = 5; + + @ConfField public static int fe_session_mgr_threads_num = 1; + + @ConfField public static int fe_session_mgr_blocking_queue_size = 1024; + + @ConfField(mutable = true, masterOnly = true) + public static int loss_conn_fe_temp_table_keep_second = 60; + /** * Max bytes a broker scanner can process in one broker load job. * Commonly, each Backends has one broker scanner. diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index f69bc06ebbf3f38..d3539e96f8162b4 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -190,8 +190,6 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star Env.getCurrentEnv().initialize(args); Env.getCurrentEnv().waitForReady(); - Env.getCurrentEnv().cleanPhantomTempTable(); - // init and start: // 1. HttpServer for HTTP Server // 2. FeServer for Thrift Server diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 909caabbfdfec52..1db8f370b0d0f80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -308,6 +308,9 @@ public void analyze(Analyzer analyzer) throws UserException { if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) { this.properties = maybeRewriteByAutoBucket(distributionDesc, properties); } + if (isTemp && !engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) { + throw new AnalysisException("Temporary table should be OLAP table"); + } super.analyze(analyzer); tableName.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryStatsStmt.java index a4bd934eab0694e..c3d5e0b0312e913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryStatsStmt.java @@ -140,7 +140,7 @@ public void analyze(Analyzer analyzer) throws UserException { .checkTblPriv(ConnectContext.get(), ctlName, dbName, tableName, PrivPredicate.SHOW)) { if (Util.isTempTable(tableName)) { if (Util.isTempTableInCurrentSession(tableName)) { - totalRows.add(Arrays.asList(Util.getTempTableOuterName(tableName), + totalRows.add(Arrays.asList(Util.getTempTableDisplayName(tableName), String.valueOf(queryHit))); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 363d39067e398ac..4200024d89eadaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -49,6 +49,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.AzureFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -442,7 +443,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws if (Config.ignore_backup_not_support_table_type) { LOG.warn("Table '{}' is a {} table, can not backup and ignore it." + "Only OLAP(Doris)/ODBC/VIEW table can be backed up", - tblName, tbl.getType().toString()); + tblName, tbl.isTemporary() ? "temporary" : tbl.getType().toString()); tblRefsNotSupport.add(tblRef); continue; } else { @@ -450,6 +451,19 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws } } + if (tbl.isTemporary()) { + if (Config.ignore_backup_not_support_table_type) { + LOG.warn("Table '{}' is a temporary table, can not backup and ignore it." + + "Only OLAP(Doris)/ODBC/VIEW table can be backed up", + Util.getTempTableDisplayName(tblName)); + tblRefsNotSupport.add(tblRef); + continue; + } else { + ErrorReport.reportDdlException("Table " + Util.getTempTableDisplayName(tblName) + + " is a temporary table, do not support backup"); + } + } + OlapTable olapTbl = (OlapTable) tbl; tbl.readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index fccd53407c19197..fd3cf5ef22ba52b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -427,12 +427,7 @@ public Pair createTableWithLock( lowerCaseToTableName.put(tableName.toLowerCase(), tableName); nameToTable.put(table.getName(), table); if (table.isTemporary()) { - if (isReplay) { - // add to to-deleted list, and delete it after catalog is ready - Env.getCurrentEnv().addPhantomTempTable(table); - } else { - ConnectContext.get().addTempTableToDB(table.getQualifiedDbName(), table.getName()); - } + Env.getCurrentEnv().registerTempTableAndSession(table); } if (!isReplay) { @@ -481,6 +476,9 @@ public void unregisterTable(String tableName) { this.nameToTable.remove(tableName); this.lowerCaseToTableName.remove(tableName.toLowerCase()); this.idToTable.remove(table.getId()); + if (table.isTemporary()) { + Env.getCurrentEnv().unregisterTempTable(table); + } table.markDropped(); } } @@ -1037,8 +1035,8 @@ public boolean updateDbProperties(Map properties) throws DdlExce try { if (!olapTable.getBinlogConfig().isEnable()) { String errMsg = String - .format("binlog is not enable in table[%s] in db [%s]", table.getName(), - getFullName()); + .format("binlog is not enable in table[%s] in db [%s]", + Util.getTempTableDisplayName(table.getName()), getFullName()); throw new DdlException(errMsg); } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e32aeac3a661c98..48f953ca765adf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -457,6 +457,10 @@ public class Env { protected SystemInfoService systemInfo; private HeartbeatMgr heartbeatMgr; + private FESessionMgr feSessionMgr; + private TemporaryTableMgr temporaryTableMgr; + // alive session of current fe + private Set aliveSessionSet; private TabletInvertedIndex tabletInvertedIndex; private ColocateTableIndex colocateTableIndex; @@ -576,8 +580,10 @@ public class Env { private final List forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids); + // all sessions' last heartbeat time of all fe + private static volatile Map sessionReportTimeMap = new HashMap<>(); + private TokenManager tokenManager; - private List phantomTempTableList; // if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it. private final Map> configtoThreads = ImmutableMap @@ -732,6 +738,9 @@ public Env(boolean isCheckpointCatalog) { this.systemInfo = EnvFactory.getInstance().createSystemInfoService(); this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog); + this.feSessionMgr = new FESessionMgr(); + this.temporaryTableMgr = new TemporaryTableMgr(); + this.aliveSessionSet = new HashSet<>(); this.tabletInvertedIndex = new TabletInvertedIndex(); this.colocateTableIndex = new ColocateTableIndex(); this.recycleBin = new CatalogRecycleBin(); @@ -827,27 +836,40 @@ public Env(boolean isCheckpointCatalog) { this.splitSourceManager = new SplitSourceManager(); this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr(); this.tokenManager = new TokenManager(); - this.phantomTempTableList = new ArrayList<>(); } - public void addPhantomTempTable(Table table) { - phantomTempTableList.add(table); + public static Map getSessionReportTimeMap() { + return sessionReportTimeMap; } - public void removePhantomTempTable(Table table) { - phantomTempTableList.remove(table); + public void registerTempTableAndSession(Table table) { + if (ConnectContext.get() != null) { + ConnectContext.get().addTempTableToDB(table.getQualifiedDbName(), table.getName()); + } + + refreshSession(Util.getTempTableSessionId(table.getName())); } - public void cleanPhantomTempTable() { - for (Table table : phantomTempTableList) { - try { - getInternalCatalog().dropTableWithoutCheck(getInternalCatalog().getDb(table.getDBName()).get(), - table, true); - } catch (DdlException e) { - LOG.error("drop temporary table error: db: {}, table: {}", table.getDBName(), table.getName(), e); - } + public void unregisterTempTable(Table table) { + if (ConnectContext.get() != null) { + ConnectContext.get().removeTempTableFromDB(table.getQualifiedDbName(), table.getName()); + } + } + + private void refreshSession(long sessionId) { + sessionReportTimeMap.put(sessionId, System.currentTimeMillis()); + } + + public void checkAndRefreshSession(long sessionId) { + if (sessionReportTimeMap.containsKey(sessionId)) { + sessionReportTimeMap.put(sessionId, System.currentTimeMillis()); + } + } + + public void refreshAllAliveSession() { + for (long sessionId : sessionReportTimeMap.keySet()) { + refreshSession(sessionId); } - phantomTempTableList.clear(); } public static void destroyCheckpoint() { @@ -1294,8 +1316,8 @@ protected void getClusterIdAndRole() throws IOException { // this loop will not end until we get certain role type and name while (true) { if (!getFeNodeTypeAndNameFromHelpers()) { - LOG.warn("current node is not added to the group. please add it first. " - + "sleep 5 seconds and retry, current helper nodes: {}", helperNodes); + LOG.warn("current node {} is not added to the group. please add it first. " + + "sleep 5 seconds and retry, current helper nodes: {}", selfNode, helperNodes); try { Thread.sleep(5000); continue; @@ -1724,6 +1746,11 @@ private void transferToMaster() { toMasterProgress = "start daemon threads"; + // coz current fe was not master fe and didn't get all fes' alive session report before, which cause + // sessionReportTimeMap is not up-to-date. + // reset all session's last heartbeat time. must run before init of TemporaryTableMgr + refreshAllAliveSession(); + // start all daemon threads that only running on MASTER FE startMasterOnlyDaemonThreads(); // start other daemon threads that should running on all FE @@ -1834,6 +1861,14 @@ protected void startMasterOnlyDaemonThreads() { // heartbeat mgr heartbeatMgr.setMaster(clusterId, token, epoch); heartbeatMgr.start(); + + // alive session of all fes' mgr + feSessionMgr.setClusterId(clusterId); + feSessionMgr.setToken(token); + feSessionMgr.start(); + + temporaryTableMgr.start(); + // New load scheduler pendingLoadTaskScheduler.start(); loadingLoadTaskScheduler.start(); @@ -3650,6 +3685,7 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder if (olapTable.getEnableLightSchemaChange()) { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE).append("\" = \""); sb.append(olapTable.getEnableLightSchemaChange()).append("\""); + } // storage policy @@ -3843,7 +3879,7 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis sb.append("`").append(dbName).append("`."); } if (table.isTemporary()) { - sb.append("`").append(Util.getTempTableOuterName(table.getName())).append("`"); + sb.append("`").append(Util.getTempTableDisplayName(table.getName())).append("`"); } else { sb.append("`").append(table.getName()).append("`"); } @@ -6806,5 +6842,17 @@ private void replayJournalsAndExit() { System.exit(0); } + + public void registerSessionInfo(long sessionId) { + this.aliveSessionSet.add(sessionId); + } + + public void unregisterSessionInfo(long sessionId) { + this.aliveSessionSet.remove(sessionId); + } + + public List getAllAliveSessionIds() { + return new ArrayList<>(aliveSessionSet); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FESessionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FESessionMgr.java new file mode 100644 index 000000000000000..a2fb871d28b2a5f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FESessionMgr.java @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService.HostInfo; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TFrontendReportAliveSessionRequest; +import org.apache.doris.thrift.TFrontendReportAliveSessionResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +/* + * FESessionMgr is for collecting alive sessions from frontends. + * Only run on master FE. + */ +public class FESessionMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(FESessionMgr.class); + + private final ExecutorService executor; + + private int clusterId; + + private String token; + + public FESessionMgr() { + super("fe-session-mgr", Config.alive_session_update_interval_second * 1000); + this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.fe_session_mgr_threads_num, + Config.fe_session_mgr_blocking_queue_size, "all-fe-session-mgr-pool", false); + } + + @Override + protected void runAfterCatalogReady() { + List frontends = Env.getCurrentEnv().getFrontends(null); + for (Frontend frontend : frontends) { + if (!frontend.isAlive()) { + continue; + } + FEAliveSessionHandler handler = new FEAliveSessionHandler(frontend, clusterId, token); + executor.submit(handler); + } + } + + public void setClusterId(int clusterId) { + this.clusterId = clusterId; + } + + public void setToken(String token) { + this.token = token; + } + + private class FEAliveSessionHandler implements Runnable { + private final Frontend fe; + private final int clusterId; + private final String token; + + public FEAliveSessionHandler(Frontend fe, int clusterId, String token) { + this.fe = fe; + this.clusterId = clusterId; + this.token = token; + } + + @Override + public void run() { + Env env = Env.getCurrentEnv(); + HostInfo selfNode = env.getSelfNode(); + if (fe.getHost().equals(selfNode.getHost())) { + if (env.isReady()) { + List sessionIds = env.getAllAliveSessionIds(); + for (Long sessionId : sessionIds) { + Env.getCurrentEnv().checkAndRefreshSession(sessionId); + } + } else { + LOG.info("Master FE is not ready"); + } + } else { + getAliveSessionAndRefresh(); + } + } + + private void getAliveSessionAndRefresh() { + FrontendService.Client client = null; + TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort()); + boolean ok = false; + try { + client = ClientPool.frontendPool.borrowObject(addr); + TFrontendReportAliveSessionRequest request = new TFrontendReportAliveSessionRequest(clusterId, token); + TFrontendReportAliveSessionResult result = client.getAliveSessions(request); + ok = true; + if (result.getStatus() == TStatusCode.OK) { + List sessionIds = result.getSessionIdList(); + for (Long sessionId : sessionIds) { + Env.getCurrentEnv().checkAndRefreshSession(sessionId); + } + } else { + LOG.warn("Error occurred when get alive session from " + fe.getHost() + + ", msg = " + result.getMsg()); + } + } catch (Exception e) { + LOG.warn("Error occurred when get alive session from " + fe.getHost() + + ", msg = " + e.getMessage()); + } finally { + if (ok) { + ClientPool.frontendPool.returnObject(addr, client); + } else { + ClientPool.frontendPool.invalidateObject(addr, client); + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 386f0094ff117cd..586c1fcf75ba232 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -132,7 +132,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso private Map readLockThreads = null; @SerializedName(value = "isTemporary") - protected boolean isTemporary = false; + private boolean isTemporary = false; public Table(TableType type) { this.type = type; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TemporaryTableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TemporaryTableMgr.java new file mode 100644 index 000000000000000..ad46399559faaad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TemporaryTableMgr.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.Util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Date; +import java.util.Map; + +/* + * Delete temporary table when its creating session is gone + */ +public class TemporaryTableMgr extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(TemporaryTableMgr.class); + + public TemporaryTableMgr() { + super("temporary-table-mgr"); + } + + @Override + protected void runAfterCatalogReady() { + Map sessionReportTimeMap = Env.getCurrentEnv().getSessionReportTimeMap(); + long currentTs = System.currentTimeMillis(); + Collection> internalDBs = Env.getCurrentEnv().getInternalCatalog().getAllDbs(); + for (DatabaseIf db : internalDBs) { + for (TableIf table : db.getTables()) { + if (!table.isTemporary()) { + continue; + } + + long sessionId = Util.getTempTableSessionId(table.getName()); + boolean needDelete = false; + if (!sessionReportTimeMap.containsKey(sessionId)) { + LOG.info("Cannot find session id for table " + table.getName()); + needDelete = true; + } else if (currentTs > sessionReportTimeMap.get(sessionId) + + Config.loss_conn_fe_temp_table_keep_second * 1000) { + LOG.info("Temporary table " + table.getName() + " is out of time: " + + new Date(sessionReportTimeMap.get(sessionId)) + ", current: " + new Date(currentTs)); + needDelete = true; + } + + if (needDelete) { + LOG.info("Drop temporary table " + table); + try { + Env.getCurrentEnv().getInternalCatalog() + .dropTableWithoutCheck((Database) db, (Table) table, false, true); + } catch (Exception e) { + LOG.error("Drop temporary table error: db: {}, table: {}", + db.getFullName(), table.getName(), e); + } + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index e7ffc0c8e65c779..3519fcfd54e011a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -691,19 +691,22 @@ public static long genIdByName(String... names) { return Math.abs(sha256long(String.join(".", names))); } - public static String generateTempTableInnerName(String tableName) { + if (tableName.indexOf("_#TEMP#_") != -1) { + return tableName; + } + ConnectContext ctx = ConnectContext.get(); // when replay edit log, no need to generate temp table name - return ctx == null ? tableName : ctx.getConnectionId() + "_#TEMP#_" + tableName; + return ctx == null ? tableName : ctx.getSessionId() + "_#TEMP#_" + tableName; } - public static String getTempTableOuterName(String tableName) { + public static String getTempTableDisplayName(String tableName) { return tableName.indexOf("_#TEMP#_") != -1 ? tableName.split("_#TEMP#_")[1] : tableName; } - public static int getTempTableConnectionId(String tableName) { - return tableName.indexOf("_#TEMP#_") != -1 ? new Integer(tableName.split("_#TEMP#_")[0]) : -1; + public static long getTempTableSessionId(String tableName) { + return tableName.indexOf("_#TEMP#_") != -1 ? new Long(tableName.split("_#TEMP#_")[0]) : -1; } public static boolean isTempTable(String tableName) { @@ -711,6 +714,6 @@ public static boolean isTempTable(String tableName) { } public static boolean isTempTableInCurrentSession(String tableName) { - return getTempTableConnectionId(tableName) == ConnectContext.get().getConnectionId(); + return ConnectContext.get().getSessionId() == getTempTableSessionId(tableName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index faece3af003ffe3..2e9ad92d40f8f19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1023,9 +1023,6 @@ private void dropTableInternal(Database db, Table table, boolean isView, boolean DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, isView, forceDrop, recycleTime); Env.getCurrentEnv().getEditLog().logDropTable(info); Env.getCurrentEnv().getMtmvService().dropTable(table); - if (table.isTemporary() && ConnectContext.get() != null) { - ConnectContext.get().removeTempTableFromDB(db.getFullName(), tableName); - } } private static String genDropHint(String dbName, TableIf table) { @@ -1049,9 +1046,6 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table); } - if (isReplay && table.isTemporary()) { - Env.getCurrentEnv().removePhantomTempTable(table); - } Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId()); db.unregisterTable(table.getName()); StopWatch watch = StopWatch.createStarted(); @@ -2426,6 +2420,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx if (stmt.isTemp()) { tableName = Util.generateTempTableInnerName(tableName); } + boolean tableHasExist = false; BinlogConfig dbBinlogConfig; db.readLock(); @@ -2439,6 +2434,9 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx if (dbBinlogConfig.isEnable() && !createTableBinlogConfig.isEnable()) { throw new DdlException("Cannot create table with binlog disabled when database binlog enable"); } + if (stmt.isTemp() && createTableBinlogConfig.isEnable()) { + throw new DdlException("Cannot create temporary table with binlog enable"); + } stmt.getProperties().putAll(createTableBinlogConfig.toProperties()); // get keys type @@ -2491,7 +2489,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx long partitionId = idGeneratorBuffer.getNextId(); // use table name as single partition name if (stmt.isTemp()) { - partitionNameToId.put(Util.getTempTableOuterName(tableName), partitionId); + partitionNameToId.put(Util.getTempTableDisplayName(tableName), partitionId); } else { partitionNameToId.put(tableName, partitionId); } @@ -2538,6 +2536,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // this should be done before create partition. Map properties = stmt.getProperties(); + if (stmt.isTemp()) { + properties.put("binlog.enable", "false"); + } + short minLoadReplicaNum = -1; try { minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties); @@ -2948,7 +2950,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // use table name as this single partition name long partitionId = -1; if (stmt.isTemp()) { - partitionId = partitionNameToId.get(Util.getTempTableOuterName(tableName)); + partitionId = partitionNameToId.get(Util.getTempTableDisplayName(tableName)); } else { partitionId = partitionNameToId.get(tableName); } @@ -3115,7 +3117,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema); String partitionName = tableName; if (stmt.isTemp()) { - partitionName = Util.getTempTableOuterName(tableName); + partitionName = Util.getTempTableDisplayName(tableName); } long partitionId = partitionNameToId.get(partitionName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 61c8ec3be771f0d..190e5ad0d8a6db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -274,7 +274,7 @@ public List> getDeleteInfosByDb(long dbId) { List info = Lists.newArrayList(); if (Util.isTempTable(tableName)) { - info.add(Util.getTempTableOuterName(tableName)); + info.add(Util.getTempTableDisplayName(tableName)); if (!Util.isTempTableInCurrentSession(tableName)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 51216e94f0c6ff0..da0459625e24086 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -621,7 +621,7 @@ private List getSelectedPartitions(OlapTable olapTable, Collection nameParts = visitMultipartIdentifier(ctx.name); // TODO: support catalog if (nameParts.size() == 1) { + // dbName should be set + dbName = ConnectContext.get().getDatabase(); tableName = nameParts.get(0); } else if (nameParts.size() == 2) { dbName = nameParts.get(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 9148dacda7c9e3a..1ccbcf9c7a9e182 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -43,6 +43,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.es.EsUtil; @@ -280,7 +281,8 @@ public void validate(ConnectContext ctx) { } try { - FeNameFormat.checkTableName(tableName); + // check display name for temporary table, its inner name cannot pass validation + FeNameFormat.checkTableName(isTemp ? Util.getTempTableDisplayName(tableName) : tableName); } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } @@ -750,6 +752,13 @@ private void checkEngineName() { } } + if (isTemp && !engineName.equals(ENGINE_OLAP)) { + throw new AnalysisException("Do not support temporary table with engine name = " + engineName); + } + if (isTemp && !rollups.isEmpty()) { + throw new AnalysisException("Do not support temporary table with rollup "); + } + if (!Config.enable_odbc_mysql_broker_table && (engineName.equals(ENGINE_ODBC) || engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_BROKER))) { throw new AnalysisException("odbc, mysql and broker table is no longer supported." @@ -1148,5 +1157,9 @@ public PartitionTableInfo getPartitionTableInfo() { public DistributionDescriptor getDistribution() { return distribution; } + + public boolean isTemp() { + return isTemp; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 686ae38c0fb323c..af34f3e99a8371d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -196,7 +196,7 @@ public void dropPolicy(DropPolicyLog dropPolicyLog, boolean ifExists) throws Ddl OlapTable olapTable = (OlapTable) table; String tableName = table.getName(); if (table.isTemporary()) { - tableName = Util.getTempTableOuterName(tableName); + tableName = Util.getTempTableDisplayName(tableName); } PartitionInfo partitionInfo = olapTable.getPartitionInfo(); for (Long partitionId : olapTable.getPartitionIds()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index c4c928c92654b0c..54b32a610eaaada 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.StringLiteral; @@ -240,6 +241,9 @@ public enum ConnectType { private Map> dbToTempTableNamesMap = new HashMap<>(); + // unique session id in the doris cluster + private long sessionId; + // internal call like `insert overwrite` need skipAuth // For example, `insert overwrite` only requires load permission, // but the internal implementation will call the logic of `AlterTable`. @@ -308,7 +312,7 @@ public void setOrUpdateInsertResult(long txnId, String label, String db, String if (isTxnModel() && insertResult != null) { insertResult.updateResult(txnStatus, loadedRows, filteredRows); } else { - insertResult = new InsertResult(txnId, label, db, Util.getTempTableOuterName(tbl), + insertResult = new InsertResult(txnId, label, db, Util.getTempTableDisplayName(tbl), txnStatus, loadedRows, filteredRows); } } @@ -372,6 +376,9 @@ public void init() { if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); } + + sessionId = Util.sha256long(Env.getCurrentEnv().getNodeName() + System.currentTimeMillis()); + Env.getCurrentEnv().registerSessionInfo(sessionId); } public ConnectContext() { @@ -382,6 +389,12 @@ public ConnectContext(StreamConnection connection) { this(connection, false); } + public ConnectContext(StreamConnection connection, boolean isProxy, long sessionId) { + this(connection, isProxy); + // used for binding new created temporary table with its original session + this.sessionId = sessionId; + } + public ConnectContext(StreamConnection connection, boolean isProxy) { connectType = ConnectType.MYSQL; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; @@ -775,6 +788,10 @@ public CatalogIf getCurrentCatalog() { return getCatalog(defaultCatalog); } + public long getSessionId() { + return sessionId; + } + /** * Maybe return when catalogName is not exist. So need to check nullable. */ @@ -840,17 +857,46 @@ public void cleanup() { threadLocalInfo.remove(); returnRows = 0; deleteTempTable(); + Env.getCurrentEnv().unregisterSessionInfo(this.sessionId); } protected void deleteTempTable() { - for (String dbName : dbToTempTableNamesMap.keySet()) { - Database db = Env.getCurrentEnv().getInternalCatalog().getDb(dbName).get(); - for (String tableName : dbToTempTableNamesMap.get(dbName)) { - try { - Env.getCurrentEnv().getInternalCatalog() - .dropTableWithoutCheck(db, db.getTable(tableName).get(), true); - } catch (DdlException e) { - LOG.error("drop temporary table error: db: {}, table: {}", dbName, tableName, e); + // only delete temporary table in its creating session, not proxy session in master fe + if (isProxy) { + return; + } + + // if current fe is master, delete temporary table directly + if (Env.getCurrentEnv().isMaster()) { + for (String dbName : dbToTempTableNamesMap.keySet()) { + Database db = Env.getCurrentEnv().getInternalCatalog().getDb(dbName).get(); + for (String tableName : dbToTempTableNamesMap.get(dbName)) { + LOG.info("try to drop temporary table: {}.{}", dbName, tableName); + try { + Env.getCurrentEnv().getInternalCatalog() + .dropTableWithoutCheck(db, db.getTable(tableName).get(), false, true); + } catch (DdlException e) { + LOG.error("drop temporary table error: {}.{}", dbName, tableName, e); + } + } + } + } else { + // forward to master fe to drop table + RedirectStatus redirectStatus = new RedirectStatus(true, false); + for (String dbName : dbToTempTableNamesMap.keySet()) { + for (String tableName : dbToTempTableNamesMap.get(dbName)) { + LOG.info("request to delete temporary table: {}.{}", dbName, tableName); + String dropTableSql = String.format("drop table `%s`", tableName); + OriginStatement originStmt = new OriginStatement(dropTableSql, 0); + MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, this, redirectStatus, false); + if (LOG.isDebugEnabled()) { + LOG.debug("need to transfer to Master. stmt: {}", this.getStmtId()); + } + try { + masterOpExecutor.execute(); + } catch (Exception e) { + LOG.error("master FE drop temporary table error: db: {}, table: {}", dbName, tableName, e); + } } } } @@ -1381,10 +1427,10 @@ public void addTempTableToDB(String database, String tableName) { Set tableNameSet = dbToTempTableNamesMap.get(database); if (tableNameSet == null) { tableNameSet = new HashSet<>(); + dbToTempTableNamesMap.put(database, tableNameSet); } tableNameSet.add(tableName); - dbToTempTableNamesMap.put(database, tableNameSet); } public void removeTempTableFromDB(String database, String tableName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 1f7d87bdfe35b3d..e1c38752165bef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -211,6 +211,7 @@ private TMasterOpRequest buildStmtForwardParams() throws AnalysisException { params.setUserIp(ctx.getRemoteIP()); params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + params.setSessionId(ctx.getSessionId()); if (Config.isCloudMode()) { String cluster = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index dd92cd8791df737..af5c496d390cf41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -876,7 +876,7 @@ private void handleShowTableId() throws AnalysisException { if (!Util.isTempTableInCurrentSession(table.getName())) { continue; } - row.add(Util.getTempTableOuterName(table.getName())); + row.add(Util.getTempTableDisplayName(table.getName())); } else { row.add(table.getName()); } @@ -913,7 +913,7 @@ private void handleShowPartitionId() throws AnalysisException { if (!Util.isTempTableInCurrentSession(tbl.getName())) { continue; } - row.add(Util.getTempTableOuterName(tbl.getName())); + row.add(Util.getTempTableDisplayName(tbl.getName())); } else { row.add(tbl.getName()); } @@ -1052,7 +1052,7 @@ private void handleShowTableStatus() throws AnalysisException { if (!Util.isTempTableInCurrentSession(table.getName())) { continue; } - row.add(Util.getTempTableOuterName(table.getName())); + row.add(Util.getTempTableDisplayName(table.getName())); } else { row.add(table.getName()); } @@ -1192,7 +1192,7 @@ private void handleShowCreateTable() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_OBJECT, showStmt.getDb(), showStmt.getTable(), "VIEW", "Use 'SHOW CREATE TABLE '" + table.getName()); } - rows.add(Lists.newArrayList(Util.getTempTableOuterName(table.getName()), createTableStmt.get(0))); + rows.add(Lists.newArrayList(Util.getTempTableDisplayName(table.getName()), createTableStmt.get(0))); resultSet = table.getType() != TableType.MATERIALIZED_VIEW ? new ShowResultSet(showStmt.getMetaData(), rows) : new ShowResultSet(ShowCreateTableStmt.getMaterializedViewMetaData(), rows); @@ -2557,7 +2557,7 @@ private void handleShowDynamicPartition() throws AnalysisException { if (!Util.isTempTableInCurrentSession(tableName)) { continue; } - tableName = Util.getTempTableOuterName(tableName); + tableName = Util.getTempTableDisplayName(tableName); } ReplicaAllocation replicaAlloc = dynamicPartitionProperty.getReplicaAllocation(); if (replicaAlloc.isNotSet()) { @@ -3062,7 +3062,7 @@ private void handleShowAnalyze() { row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted"); if (databaseIf.isPresent()) { Optional table = databaseIf.get().getTable(analysisInfo.tblId); - row.add(table.isPresent() ? Util.getTempTableOuterName(table.get().getName()) + row.add(table.isPresent() ? Util.getTempTableDisplayName(table.get().getName()) : "Table may get deleted"); } else { row.add("DB may get deleted"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8323540f5d1021c..f93c9a81c2f9d1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -157,6 +157,8 @@ import org.apache.doris.thrift.TFrontendPingFrontendRequest; import org.apache.doris.thrift.TFrontendPingFrontendResult; import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; +import org.apache.doris.thrift.TFrontendReportAliveSessionRequest; +import org.apache.doris.thrift.TFrontendReportAliveSessionResult; import org.apache.doris.thrift.TGetBackendMetaRequest; import org.apache.doris.thrift.TGetBackendMetaResult; import org.apache.doris.thrift.TGetBinlogLagResult; @@ -1106,7 +1108,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { if (LOG.isDebugEnabled()) { LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); } - ConnectContext context = new ConnectContext(null, true); + ConnectContext context = new ConnectContext(null, true, params.getSessionId()); // Set current connected FE to the client address, so that we can know where // this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); @@ -2254,6 +2256,34 @@ public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws return new TStatus(TStatusCode.CANCELLED); } + @Override + public TFrontendReportAliveSessionResult getAliveSessions(TFrontendReportAliveSessionRequest request) + throws TException { + TFrontendReportAliveSessionResult result = new TFrontendReportAliveSessionResult(); + result.setStatus(TStatusCode.OK); + if (LOG.isDebugEnabled()) { + LOG.debug("receive get alive sessions request: {}", request); + } + + Env env = Env.getCurrentEnv(); + if (env.isReady()) { + if (request.getClusterId() != env.getClusterId()) { + result.setStatus(TStatusCode.INVALID_ARGUMENT); + result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId()); + } else if (!request.getToken().equals(env.getToken())) { + result.setStatus(TStatusCode.INVALID_ARGUMENT); + result.setMsg("invalid token: " + Env.getCurrentEnv().getToken()); + } else { + result.setMsg("success"); + result.setSessionIdList(env.getAllAliveSessionIds()); + } + } else { + result.setStatus(TStatusCode.ILLEGAL_STATE); + result.setMsg("not ready"); + } + return result; + } + @Override public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException { boolean isReady = Env.getCurrentEnv().isReady(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7d1bf264f2f631..4c4b56027c1b87a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -600,6 +600,9 @@ struct TMasterOpRequest { // selectdb cloud 1000: optional string cloud_cluster 1001: optional bool noAuth; + + // temporary table + 1002: optional i64 sessionId } struct TColumnDefinition { @@ -955,6 +958,17 @@ struct TFrontendPingFrontendRequest { 3: optional string deployMode } +struct TFrontendReportAliveSessionRequest { + 1: required i32 clusterId + 2: required string token +} + +struct TFrontendReportAliveSessionResult { + 1: required Status.TStatusCode status + 2: required string msg + 3: required list sessionIdList +} + struct TDiskInfo { 1: required string dirType 2: required string dir @@ -1757,6 +1771,8 @@ service FrontendService { Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) + TFrontendReportAliveSessionResult getAliveSessions(1: TFrontendReportAliveSessionRequest request) + TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) TInitExternalCtlMetaResult initExternalCtlMeta(1: TInitExternalCtlMetaRequest request) diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy index ed605f15d08dcec..f5e7e2a4e4646fb 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy @@ -683,6 +683,17 @@ suite("test_hive_ddl", "p0,external,hive,external_docker,external_docker_hive") exception "Floating point type column can not be partition column" } + try { + sql """ + CREATE TEMPORARY TABLE test_hive_db_temp_tbl ( + `col` STRING COMMENT 'col' + ) ENGINE=hive + """ + throw new IllegalStateException("Should throw error") + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Do not support temporary table"), ex.getMessage()) + } + sql """ drop database if exists `test_hive_db_tbl` """; } diff --git a/regression-test/suites/temp_table_p0/load.groovy b/regression-test/suites/temp_table_p0/load.groovy index f8854d3e3aec4ff..87082154e349980 100644 --- a/regression-test/suites/temp_table_p0/load.groovy +++ b/regression-test/suites/temp_table_p0/load.groovy @@ -170,6 +170,39 @@ suite('load', 'p0,restart_fe,docker') { select_result3 = sql "select * from t_test_temp_table3" assertEquals(select_result3.size(), 0) + try { + sql """ + CREATE TEMPORARY TABLE `t_test_temp_table_with_rollup` ( + `id` INT, + `name` varchar(32), + `gender` boolean + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + ROLLUP (r1(name, id)) + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + throw new IllegalStateException("Should throw error") + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Do not support temporary table with rollup"), ex.getMessage()) + } + + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "snst_" + UUID.randomUUID().toString().replace("-", "") + def syncer = getSyncer() + syncer.createS3Repository(repoName) + try { + sql """ + BACKUP SNAPSHOT regression_test_temp_table_p0.${snapshotName} + to ${repoName} + """ + throw new IllegalStateException("Should throw error") + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("is a temporary table, do not support backup"), ex.getMessage()) + } + def show_data = sql "show data" def containTempTable = false for(int i = 0; i < show_data.size(); i++) { @@ -222,6 +255,37 @@ suite('load', 'p0,restart_fe,docker') { sql "show data skew from t_test_temp_table2" + try { + sql """ + CREATE TEMPORARY TABLE `t_test_temp_table_with_binlog` ( + `id` INT, + `name` varchar(32), + `gender` boolean + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ); + """ + throw new IllegalStateException("Should throw error") + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Cannot create temporary table with binlog enable"), ex.getMessage()) + } + try { + sql """ + CREATE TEMPORARY TABLE `t_test_temp_table_with_binlog2` + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) as select * from t_test_table_with_data; + """ + throw new IllegalStateException("Should throw error") + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Cannot create temporary table with binlog enable"), ex.getMessage()) + } + def show_result = sql "show create table t_test_temp_table1" assertEquals(show_result[0][0], "t_test_temp_table1") assertTrue(show_result[0][1].contains("CREATE TEMPORARY TABLE"))