Skip to content

Commit

Permalink
[fix](mtmv) Fix get mv read lock too late when rewritten by materiali…
Browse files Browse the repository at this point in the history
…zed view (apache#44164)

Problem Summary:

When materialized view is rewritten, it would use the mv metadata.
Should try to get read lock before use these metadata. or it would cause
error.
Such as mv def is as following

CREATE MATERIALIZED VIEW mv1
        BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
        DISTRIBUTED BY RANDOM BUCKETS 2
        PROPERTIES ('replication_num' = '1')
        AS
          select
              o_orderdate,
              o_shippriority,
              o_comment,
              o.o_code as o_o_code,
              l_orderkey,
              l_partkey,
              l.o_code as l_o_code
            from
              orders_same_col o left
              join lineitem_same_col l on l_orderkey = o_orderkey
              left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;

When handling transparent rewriting, a MV scan plan is used for the
transparent rewrite. During the initialization of the scan plan, the
partitions of the table are retrieved, so it is necessary to attempt to
acquire a read lock on the table during initialization. If the read lock
is not acquired, subsequent operations may add or delete partitions, and
in the later processing of table partitions, calling get Partition may
not retrieve the corresponding partition, leading to data errors.
  • Loading branch information
seawinde committed Nov 21, 2024
1 parent eb1f781 commit 40c2151
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan();
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
Expand All @@ -254,15 +254,14 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
boolean checkResult = rewriteQueryByViewPreCheck(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext);
if (!checkResult) {
continue;
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -374,9 +373,9 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
Expand Down Expand Up @@ -527,6 +526,16 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
return Pair.of(mvPartitionNeedRemoveNameMap, baseTablePartitionNeedUnionNameMap);
}

/**
* Query rewrite result may output origin plan , this will cause loop.
* if return origin plan, need add check hear.
*/
protected boolean rewriteQueryByViewPreCheck(MatchMode matchMode, StructInfo queryStructInfo,
StructInfo viewStructInfo, SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan,
MaterializationContext materializationContext) {
return true;
}

/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public class AsyncMaterializationContext extends MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, cascadesContext),
cascadesContext, structInfo);
super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}

Expand All @@ -67,7 +66,8 @@ public MTMV getMtmv() {

@Override
Plan doGenerateScanPlan(CascadesContext cascadesContext) {
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, cascadesContext);
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, this.mtmv.getBaseIndexId(),
this.mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext);
}

@Override
Expand Down Expand Up @@ -107,7 +107,8 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
return Optional.empty();
}
RelationId relationId = null;
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan().collectFirst(LogicalOlapScan.class::isInstance);
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
}
Expand All @@ -127,7 +128,14 @@ boolean isFinalChosen(Relation relation) {
);
}

public Plan getScanPlan() {
@Override
public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) {
// If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent,
// try to get lock which should added before RBO
if (!this.isSuccess()) {
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
}
super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,33 +105,20 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;

StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
// Construct materialization struct info, catch exception which may cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null)
: structInfo;
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions();
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
scanPlanOutput);
}
}

Expand Down Expand Up @@ -176,17 +163,19 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) {
* if MaterializationContext is already rewritten successfully, then should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
* This method should be called when query rewrite successfully
*/
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
public void tryGenerateScanPlan(CascadesContext cascadesContext) {
if (!this.isAvailable()) {
return;
}
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Materialization output expression shuttle, this will be used to expression rewrite
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions,
scanPlanOutput);
// This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
Expand All @@ -195,6 +184,17 @@ public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
this.exprToScanExprMapping = regeneratedMapping;
}

/**
* Should clear scan plan after materializationContext is already rewritten successfully,
* Because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
*/
public void clearScanPlan(CascadesContext cascadesContext) {
this.scanPlan = null;
this.shuttledExprToScanExprMapping = null;
this.exprToScanExprMapping = null;
}

public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping);
}
Expand Down Expand Up @@ -275,7 +275,11 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getScanPlan() {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
|| this.exprToScanExprMapping == null) {
tryGenerateScanPlan(cascadesContext);
}
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean isMVPar
.analyze()
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully, so need tmp store
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0).getScanPlan().getOutputSet();
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0)
.getScanPlan(null, c1).getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,53 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""


def mv12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code <> '91'
union all
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code = '92';
"""

order_qt_query12_0_before "${query12_0}"
async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0")
order_qt_query12_0_after "${query12_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0"""
}

0 comments on commit 40c2151

Please sign in to comment.