Skip to content

Commit

Permalink
update and add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Yulei-Yang committed Dec 27, 2024
1 parent 8e98788 commit 9fe43ec
Show file tree
Hide file tree
Showing 20 changed files with 746 additions and 674 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -170,7 +171,7 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt
List<String> row = Lists.newArrayList();
// p data structure is Pair<Pair<IndexName, ColumnName>, ColumnStatistic>
row.add(p.first.second);
row.add(p.first.first);
row.add(Util.getTempTableDisplayName(p.first.first));
row.add(String.valueOf(p.second.count));
row.add(String.valueOf(p.second.ndv));
row.add(String.valueOf(p.second.numNulls));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ public int compare(Table t1, Table t2) {
// admin users can see all temporary tables no matter they are created by which session
if (!isAdmin) {
// non admin user can only see temporary tables in current session
if (table.isTemporary() && Util.getTempTableSessionId(table.getName())
!= ConnectContext.get().getSessionId()) {
if (table.isTemporary() && !Util.isTempTableInCurrentSession(table.getName())) {
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long time
if (tableIds != null) {
for (long tableId : tableIds) {
boolean tableBinlogEnable = binlogConfigCache.isEnableTable(dbId, tableId);
anyEnable = anyEnable || tableBinlogEnable;
if (tableIds.size() > 1) {
anyEnable = anyEnable || tableBinlogEnable;
} else {
anyEnable = anyEnable && tableBinlogEnable;
}
if (anyEnable) {
break;
}
Expand Down
18 changes: 9 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public class Env {
private FESessionMgr feSessionMgr;
private TemporaryTableMgr temporaryTableMgr;
// alive session of current fe
private Set<Long> aliveSessionSet;
private Set<String> aliveSessionSet;
private TabletInvertedIndex tabletInvertedIndex;
private ColocateTableIndex colocateTableIndex;

Expand Down Expand Up @@ -582,7 +582,7 @@ public class Env {
private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids);

// all sessions' last heartbeat time of all fe
private static volatile Map<Long, Long> sessionReportTimeMap = new HashMap<>();
private static volatile Map<String, Long> sessionReportTimeMap = new HashMap<>();

private TokenManager tokenManager;

Expand Down Expand Up @@ -839,7 +839,7 @@ public Env(boolean isCheckpointCatalog) {
this.tokenManager = new TokenManager();
}

public static Map<Long, Long> getSessionReportTimeMap() {
public static Map<String, Long> getSessionReportTimeMap() {
return sessionReportTimeMap;
}

Expand All @@ -857,18 +857,18 @@ public void unregisterTempTable(Table table) {
}
}

private void refreshSession(long sessionId) {
private void refreshSession(String sessionId) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}

public void checkAndRefreshSession(long sessionId) {
public void checkAndRefreshSession(String sessionId) {
if (sessionReportTimeMap.containsKey(sessionId)) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}
}

public void refreshAllAliveSession() {
for (long sessionId : sessionReportTimeMap.keySet()) {
for (String sessionId : sessionReportTimeMap.keySet()) {
refreshSession(sessionId);
}
}
Expand Down Expand Up @@ -6852,15 +6852,15 @@ private void replayJournalsAndExit() {
System.exit(0);
}

public void registerSessionInfo(long sessionId) {
public void registerSessionInfo(String sessionId) {
this.aliveSessionSet.add(sessionId);
}

public void unregisterSessionInfo(long sessionId) {
public void unregisterSessionInfo(String sessionId) {
this.aliveSessionSet.remove(sessionId);
}

public List<Long> getAllAliveSessionIds() {
public List<String> getAllAliveSessionIds() {
return new ArrayList<>(aliveSessionSet);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void run() {
HostInfo selfNode = env.getSelfNode();
if (fe.getHost().equals(selfNode.getHost())) {
if (env.isReady()) {
List<Long> sessionIds = env.getAllAliveSessionIds();
for (Long sessionId : sessionIds) {
List<String> sessionIds = env.getAllAliveSessionIds();
for (String sessionId : sessionIds) {
Env.getCurrentEnv().checkAndRefreshSession(sessionId);
}
} else {
Expand All @@ -113,8 +113,8 @@ private void getAliveSessionAndRefresh() {
TFrontendReportAliveSessionResult result = client.getAliveSessions(request);
ok = true;
if (result.getStatus() == TStatusCode.OK) {
List<Long> sessionIds = result.getSessionIdList();
for (Long sessionId : sessionIds) {
List<String> sessionIds = result.getSessionIdList();
for (String sessionId : sessionIds) {
Env.getCurrentEnv().checkAndRefreshSession(sessionId);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public TemporaryTableMgr() {

@Override
protected void runAfterCatalogReady() {
Map<Long, Long> sessionReportTimeMap = Env.getCurrentEnv().getSessionReportTimeMap();
Map<String, Long> sessionReportTimeMap = Env.getCurrentEnv().getSessionReportTimeMap();
long currentTs = System.currentTimeMillis();
Collection<DatabaseIf<? extends TableIf>> internalDBs = Env.getCurrentEnv().getInternalCatalog().getAllDbs();
for (DatabaseIf<? extends TableIf> db : internalDBs) {
Expand All @@ -50,7 +50,7 @@ protected void runAfterCatalogReady() {
continue;
}

long sessionId = Util.getTempTableSessionId(table.getName());
String sessionId = Util.getTempTableSessionId(table.getName());
boolean needDelete = false;
if (!sessionReportTimeMap.containsKey(sessionId)) {
LOG.info("Cannot find session id for table " + table.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,15 +705,15 @@ public static String getTempTableDisplayName(String tableName) {
return tableName.indexOf("_#TEMP#_") != -1 ? tableName.split("_#TEMP#_")[1] : tableName;
}

public static long getTempTableSessionId(String tableName) {
return tableName.indexOf("_#TEMP#_") != -1 ? new Long(tableName.split("_#TEMP#_")[0]) : -1;
public static String getTempTableSessionId(String tableName) {
return tableName.indexOf("_#TEMP#_") != -1 ? tableName.split("_#TEMP#_")[0] : "";
}

public static boolean isTempTable(String tableName) {
return tableName.indexOf("_#TEMP#_") != -1;
}

public static boolean isTempTableInCurrentSession(String tableName) {
return ConnectContext.get().getSessionId() == getTempTableSessionId(tableName);
return getTempTableSessionId(tableName).equals(ConnectContext.get().getSessionId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2431,7 +2431,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
}
BinlogConfig createTableBinlogConfig = new BinlogConfig(dbBinlogConfig);
createTableBinlogConfig.mergeFromProperties(stmt.getProperties());
if (dbBinlogConfig.isEnable() && !createTableBinlogConfig.isEnable()) {
if (dbBinlogConfig.isEnable() && !createTableBinlogConfig.isEnable() && !stmt.isTemp()) {
throw new DdlException("Cannot create table with binlog disabled when database binlog enable");
}
if (stmt.isTemp() && createTableBinlogConfig.isEnable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ public List<List<Comparable>> getDeleteInfosByDb(long dbId) {

List<Comparable> info = Lists.newArrayList();
if (Util.isTempTable(tableName)) {
info.add(Util.getTempTableDisplayName(tableName));
if (!Util.isTempTableInCurrentSession(tableName)) {
continue;
}
info.add(Util.getTempTableDisplayName(tableName));
} else {
info.add(deleteInfo.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ private void checkPartitions(ConnectContext ctx, TableName tblName) throws Analy
case ODBC:
case JDBC:
case OLAP:
if (table.isTemporary()) {
throw new AnalysisException("Do not support exporting temporary partitions");
}
break;
case VIEW: // We support export view, so we do not need to check partition here.
if (this.partitionsNames.size() > 0) {
Expand Down Expand Up @@ -255,8 +258,7 @@ private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> file
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
if (table.isTemporary()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
+ table.getType() + " type, do not support export.");
throw new AnalysisException("Table[" + tblName.getTbl() + "] is temporary table, do not support export.");
}

exportJob.setDbId(db.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -139,7 +140,7 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc
rows.add(Lists.newArrayList(table.getName(), createTableStmt.get(0), "utf8mb4", "utf8mb4_0900_bin"));
return new ShowResultSet(VIEW_META_DATA, rows);
} else {
rows.add(Lists.newArrayList(table.getName(), createTableStmt.get(0)));
rows.add(Lists.newArrayList(Util.getTempTableDisplayName(table.getName()), createTableStmt.get(0)));
return (table.getType() != Table.TableType.MATERIALIZED_VIEW
? new ShowResultSet(META_DATA, rows)
: new ShowResultSet(MATERIALIZED_VIEW_META_DATA, rows));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
Expand Down Expand Up @@ -116,6 +117,12 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc
DynamicPartitionProperty dynamicPartitionProperty
= olapTable.getTableProperty().getDynamicPartitionProperty();
String tableName = olapTable.getName();
if (olapTable.isTemporary()) {
if (!Util.isTempTableInCurrentSession(tableName)) {
continue;
}
tableName = Util.getTempTableDisplayName(tableName);
}
ReplicaAllocation replicaAlloc = dynamicPartitionProperty.getReplicaAllocation();
if (replicaAlloc.isNotSet()) {
replicaAlloc = olapTable.getDefaultReplicaAllocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand All @@ -35,6 +36,8 @@
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -43,6 +46,9 @@
* show partition command
*/
public class ShowPartitionIdCommand extends ShowCommand {

private static final Logger LOG = LogManager.getLogger(ShowPartitionIdCommand.class);

private final long partitionId;

/**
Expand Down Expand Up @@ -84,13 +90,22 @@ private ShowResultSet handleShowPartitionId(ConnectContext ctx, StmtExecutor exe
if (partition != null) {
List<String> row = new ArrayList<>();
row.add(database.getFullName());
row.add(tbl.getName());
if (tbl.isTemporary()) {
if (!Util.isTempTableInCurrentSession(tbl.getName())) {
continue;
}
row.add(Util.getTempTableDisplayName(tbl.getName()));
} else {
row.add(tbl.getName());
}
row.add(partition.getName());
row.add(String.valueOf(database.getId()));
row.add(String.valueOf(tbl.getId()));
rows.add(row);
break;
}
} catch (Exception e) {
LOG.error("failed to get partition info for {}", partitionId, e);
} finally {
tbl.readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -75,7 +76,14 @@ private ShowResultSet handleShowTableId(ConnectContext ctx, StmtExecutor executo
if (table != null) {
List<String> row = new ArrayList<>();
row.add(database.getFullName());
row.add(table.getName());
if (table.isTemporary()) {
if (!Util.isTempTableInCurrentSession(table.getName())) {
continue;
}
row.add(Util.getTempTableDisplayName(table.getName()));
} else {
row.add(table.getName());
}
row.add(String.valueOf(database.getId()));
rows.add(row);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -242,7 +243,7 @@ public enum ConnectType {
private Map<String, Set<String>> dbToTempTableNamesMap = new HashMap<>();

// unique session id in the doris cluster
private long sessionId;
private String sessionId;

// internal call like `insert overwrite` need skipAuth
// For example, `insert overwrite` only requires load permission,
Expand Down Expand Up @@ -377,7 +378,7 @@ public void init() {
sessionVariable.initFuzzyModeVariables();
}

sessionId = Util.sha256long(Env.getCurrentEnv().getNodeName() + System.currentTimeMillis());
sessionId = UUID.randomUUID().toString();
Env.getCurrentEnv().registerSessionInfo(sessionId);
}

Expand All @@ -389,7 +390,7 @@ public ConnectContext(StreamConnection connection) {
this(connection, false);
}

public ConnectContext(StreamConnection connection, boolean isProxy, long sessionId) {
public ConnectContext(StreamConnection connection, boolean isProxy, String sessionId) {
this(connection, isProxy);
// used for binding new created temporary table with its original session
this.sessionId = sessionId;
Expand Down Expand Up @@ -788,7 +789,7 @@ public CatalogIf getCurrentCatalog() {
return getCatalog(defaultCatalog);
}

public long getSessionId() {
public String getSessionId() {
return sessionId;
}

Expand Down
14 changes: 13 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3072,7 +3072,19 @@ private void handleShowAnalyze() {
} else {
row.add("DB may get deleted");
}
row.add(analysisInfo.colName);
StringBuffer sb = new StringBuffer();
String colNames = analysisInfo.colName;
if (colNames != null) {
for (String columnName : colNames.split(",")) {
String[] kv = columnName.split(":");
sb.append(Util.getTempTableDisplayName(kv[0]))
.append(":").append(kv[1]).append(",");
}
}
String newColNames = sb.toString();
newColNames = StringUtils.isEmpty(newColNames) ? ""
: newColNames.substring(0, newColNames.length() - 1);
row.add(newColNames);
row.add(analysisInfo.jobType.toString());
row.add(analysisInfo.analysisType.toString());
row.add(analysisInfo.message);
Expand Down
Loading

0 comments on commit 9fe43ec

Please sign in to comment.