Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](Nereids) lock table in ascending order of table IDs (#45045) #45679

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -320,7 +320,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down Expand Up @@ -385,7 +385,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down
20 changes: 8 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3051,23 +3051,19 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisExce
"get table read lock timeout, database=" + getQualifiedDbName() + ",table=" + getName());
}
try {
return getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
if (partition != null) {
res.put(partition.getName(), entry.getValue());
}
}
return res;
} finally {
readUnlock();
}
}

public Map<String, PartitionItem> getAndCopyPartitionItemsWithoutLock() throws AnalysisException {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
if (partition != null) {
res.put(partition.getName(), entry.getValue());
}
}
return res;
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
Expand Down
134 changes: 48 additions & 86 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,55 +210,43 @@ default Map<String, Constraint> getConstraintsMapUnsafe() {
}

default Set<ForeignKeyConstraint> getForeignKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(ForeignKeyConstraint.class::isInstance)
.map(ForeignKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Map<String, Constraint> getConstraintsMap() {
readLock();
try {
return ImmutableMap.copyOf(getConstraintsMapUnsafe());
} catch (Exception ignored) {
return ImmutableMap.of();
} finally {
readUnlock();
}
}

default Set<PrimaryKeyConstraint> getPrimaryKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(PrimaryKeyConstraint.class::isInstance)
.map(PrimaryKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Set<UniqueConstraint> getUniqueConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(UniqueConstraint.class::isInstance)
.map(UniqueConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

Expand All @@ -277,34 +265,24 @@ default void checkConstraintNotExistenceUnsafe(String name, Constraint primaryKe
}

default void addUniqueConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
}

default void addPrimaryKeyConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
}

Expand All @@ -323,26 +301,19 @@ default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(

default void addForeignConstraint(String name, ImmutableList<String> columns,
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
writeLock();
referencedTable.writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
} finally {
referencedTable.writeUnlock();
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
}

Expand Down Expand Up @@ -378,40 +349,31 @@ default void replayDropConstraint(String name) {
}

default void dropConstraint(String name, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
}

default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);
} finally {
writeUnlock();
}
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down Expand Up @@ -377,16 +381,38 @@ private boolean dataMaskPoliciesChanged(
return false;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
/**
* Execute table locking operations in ascending order of table IDs.
*
* @return true if obtain all tables lock.
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
for (FullTableName fullTableName : sqlCacheContext.getUsedViews().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
currentStatementContext.lock();
return true;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
for (Entry<FullTableName, Set<String>> kv : sqlCacheContext.getCheckPrivilegeTablesOrViews().entrySet()) {
Set<String> usedColumns = kv.getValue();
TableIf tableIf = findTableIf(env, kv.getKey());
if (tableIf == null) {
return true;
}
// release when close statementContext
currentStatementContext.addTableReadLock(tableIf);
try {
UserAuthentication.checkPermission(tableIf, connectContext, usedColumns);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.doris.common.lock;

import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.ConnectContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A monitored version of ReentrantReadWriteLock that provides additional
* monitoring capabilities for read and write locks.
*/
public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock {

private static final Logger LOG = LogManager.getLogger(MonitoredReentrantReadWriteLock.class);
// Monitored read and write lock instances
private final ReadLock readLock = new ReadLock(this);
private final WriteLock writeLock = new WriteLock(this);
Expand Down Expand Up @@ -97,6 +105,11 @@ protected WriteLock(ReentrantReadWriteLock lock) {
public void lock() {
super.lock();
monitor.afterLock();
if (isFair() && getReadHoldCount() > 0) {
LOG.warn(" read lock count is {}, write lock count is {}, stack is {}, query id is {}",
getReadHoldCount(), getWriteHoldCount(), Thread.currentThread().getStackTrace(),
ConnectContext.get() == null ? "" : DebugUtil.printId(ConnectContext.get().queryId()));
}
}

/**
Expand Down
Loading
Loading