Skip to content

Commit

Permalink
[fix](mtmv) Fix get mv read lock too late when rewritten by materiali…
Browse files Browse the repository at this point in the history
…zed view (apache#44164)

Problem Summary:

When materialized view is rewritten, it would use the mv metadata.
Should try to get read lock before use these metadata. or it would cause
error.
Such as mv def is as following

CREATE MATERIALIZED VIEW mv1
        BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
        DISTRIBUTED BY RANDOM BUCKETS 2
        PROPERTIES ('replication_num' = '1') 
        AS
          select
              o_orderdate,
              o_shippriority,
              o_comment,
              o.o_code as o_o_code,
              l_orderkey, 
              l_partkey,
              l.o_code as l_o_code
            from
              orders_same_col o left
              join lineitem_same_col l on l_orderkey = o_orderkey
              left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;

When handling transparent rewriting, a MV scan plan is used for the
transparent rewrite. During the initialization of the scan plan, the
partitions of the table are retrieved, so it is necessary to attempt to
acquire a read lock on the table during initialization. If the read lock
is not acquired, subsequent operations may add or delete partitions, and
in the later processing of table partitions, calling get Partition may
not retrieve the corresponding partition, leading to data errors.
  • Loading branch information
seawinde committed Nov 19, 2024
1 parent 4f0bf80 commit 12811f8
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan(queryStructInfo);
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
Expand Down Expand Up @@ -262,12 +262,6 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -379,9 +373,9 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, mtmv.getBaseIndexId(),
mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext),
cascadesContext, structInfo);
super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}

Expand Down Expand Up @@ -110,7 +108,7 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
return Optional.empty();
}
RelationId relationId = null;
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null)
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
Expand All @@ -132,7 +130,13 @@ boolean isFinalChosen(Relation relation) {
}

@Override
public Plan getScanPlan(StructInfo queryInfo) {
public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) {
// If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent,
// try to get lock which should added before RBO
if (!this.isSuccess()) {
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
}
super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,33 +105,20 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;

StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
// Construct materialization struct info, catch exception which may cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null)
: structInfo;
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions();
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
scanPlanOutput);
}
}

Expand Down Expand Up @@ -176,17 +163,19 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) {
* if MaterializationContext is already rewritten successfully, then should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
* This method should be called when query rewrite successfully
*/
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
public void tryGenerateScanPlan(CascadesContext cascadesContext) {
if (!this.isAvailable()) {
return;
}
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Materialization output expression shuttle, this will be used to expression rewrite
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions,
scanPlanOutput);
// This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
Expand All @@ -195,6 +184,17 @@ public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
this.exprToScanExprMapping = regeneratedMapping;
}

/**
* Should clear scan plan after materializationContext is already rewritten successfully,
* Because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
*/
public void clearScanPlan(CascadesContext cascadesContext) {
this.scanPlan = null;
this.shuttledExprToScanExprMapping = null;
this.exprToScanExprMapping = null;
}

public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping);
}
Expand Down Expand Up @@ -275,7 +275,11 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getScanPlan(StructInfo queryStructInfo) {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
|| this.exprToScanExprMapping == null) {
tryGenerateScanPlan(cascadesContext);
}
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
Expand Down Expand Up @@ -55,9 +56,7 @@ public class SyncMaterializationContext extends MaterializationContext {
*/
public SyncMaterializationContext(Plan mvPlan, Plan mvOriginalPlan, OlapTable olapTable,
long indexId, String indexName, CascadesContext cascadesContext, Statistics statistics) {
super(mvPlan, mvOriginalPlan,
MaterializedViewUtils.generateMvScanPlan(olapTable, indexId, olapTable.getPartitionIds(),
PreAggStatus.unset(), cascadesContext), cascadesContext, null);
super(mvPlan, mvOriginalPlan, cascadesContext, null);
this.olapTable = olapTable;
this.indexId = indexId;
this.indexName = indexName;
Expand Down Expand Up @@ -100,7 +99,7 @@ String getStringInfo() {
@Override
Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext) {
RelationId relationId = null;
Optional<LogicalOlapScan> scanObj = this.getScanPlan(null)
Optional<LogicalOlapScan> scanObj = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (scanObj.isPresent()) {
relationId = scanObj.get().getRelationId();
Expand All @@ -109,19 +108,27 @@ Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext
}

@Override
public Plan getScanPlan(StructInfo queryStructInfo) {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
// Already get lock if sync mv, doesn't need to get lock
super.getScanPlan(queryStructInfo, cascadesContext);
if (queryStructInfo == null) {
return scanPlan;
}
if (queryStructInfo.getRelations().size() == 1
&& queryStructInfo.getRelations().get(0) instanceof LogicalOlapScan
&& !((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds().isEmpty()) {
List<CatalogRelation> queryStructInfoRelations = queryStructInfo.getRelations();
if (queryStructInfoRelations.size() == 1
&& queryStructInfoRelations.get(0) instanceof LogicalOlapScan
&& !((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds().isEmpty()) {
// Partition prune if sync materialized view
return scanPlan.accept(new DefaultPlanRewriter<Void>() {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) {
if (!queryStructInfoRelations.get(0).getTable().getFullQualifiers().equals(
olapScan.getTable().getFullQualifiers())) {
// Only the same table, we can do partition prue
return olapScan;
}
return olapScan.withSelectedPartitionIds(
((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds());
((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds());
}
}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean isMVPar
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully, so need tmp store
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0)
.getScanPlan(null).getOutputSet();
.getScanPlan(null, c1).getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,53 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""


def mv12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code <> '91'
union all
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code = '92';
"""

order_qt_query12_0_before "${query12_0}"
async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0")
order_qt_query12_0_after "${query12_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0"""
}

0 comments on commit 12811f8

Please sign in to comment.