Skip to content

Commit

Permalink
[fix](nereids) Fix query mv rewrite fail when mv cache build quickly (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde authored Dec 24, 2023
1 parent 5505fa3 commit 6ea6ff5
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -104,13 +103,8 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
logger.debug(currentClassName + " this group is already rewritten so skip");
continue;
}
MTMV mtmv = materializationContext.getMTMV();
MTMVCache mtmvCache = getCacheFromMTMV(mtmv);
if (mtmvCache == null) {
logger.warn(currentClassName + " mv cache is null so return");
return rewriteResults;
}
List<StructInfo> viewStructInfos = extractStructInfo(mtmvCache.getLogicalPlan(), cascadesContext);
List<StructInfo> viewStructInfos = extractStructInfo(materializationContext.getMvPlan(),
cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one
logger.warn(currentClassName + " the num of view struct info is more then one so return");
Expand Down Expand Up @@ -200,7 +194,7 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
CascadesContext rewrittenPlanContext =
CascadesContext.initContext(cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
Rewriter.getWholeTreeRewriter(cascadesContext).execute();
Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute();
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
logger.debug(currentClassName + "rewrite by materialized view success");
rewriteResults.add(rewrittenPlan);
Expand Down Expand Up @@ -289,17 +283,6 @@ protected boolean checkPartitionIsValid(
&& relatedTalbeValidSet.containsAll(relatedTableSelectedPartitionToCheck);
}

private MTMVCache getCacheFromMTMV(MTMV mtmv) {
MTMVCache cache;
try {
cache = mtmv.getOrGenerateCache();
} catch (AnalysisException analysisException) {
logger.warn(this.getClass().getSimpleName() + " get mtmv cache analysisException", analysisException);
return null;
}
return cache;
}

/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class MaterializationContext {
// generate form mv scan plan
private ExpressionMapping mvExprToMvScanExprMapping;
private boolean available = true;
// the mv plan from cache at present, record it to make sure query rewrite by mv is right when cache change.
private Plan mvPlan;

/**
* MaterializationContext, this contains necessary info for query rewriting by mv
Expand Down Expand Up @@ -81,6 +83,8 @@ public MaterializationContext(MTMV mtmv,
mtmvCache.getMvOutputExpressions(),
mtmvCache.getLogicalPlan()),
mvScanPlan.getExpressions());
// copy the plan from cache, which the plan in cache may change
this.mvPlan = mtmvCache.getLogicalPlan();
}

public Set<GroupId> getMatchedGroups() {
Expand Down Expand Up @@ -119,6 +123,10 @@ public boolean isAvailable() {
return available;
}

public Plan getMvPlan() {
return mvPlan;
}

/**
* MaterializationContext fromMaterializedView
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ suite("aggregate_with_roll_up") {
O_COMMENT VARCHAR(79) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
PARTITION BY RANGE(o_orderdate) (PARTITION `day_2` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(o_orderdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand Down Expand Up @@ -73,7 +77,10 @@ suite("aggregate_with_roll_up") {
l_comment VARCHAR(44) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
PARTITION BY RANGE(l_shipdate) (PARTITION `day_1` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(l_shipdate) (
PARTITION `day_1` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_2` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_3` VALUES LESS THAN ("2023-12-30"))
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand Down Expand Up @@ -144,6 +151,26 @@ suite("aggregate_with_roll_up") {
}
}

def check_rewrite_with_mv_partition = { mv_sql, query_sql, mv_name, partition_column ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
sql"""
CREATE MATERIALIZED VIEW ${mv_name}
BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
PARTITION BY (${partition_column})
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS ${mv_sql}
"""

def job_name = getJobName(db, mv_name);
waitingMTMVTaskFinished(job_name)
explain {
sql("${query_sql}")
contains "(${mv_name})"
}
}

def check_rewrite_with_force_analyze = { mv_sql, query_sql, mv_name ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
Expand Down Expand Up @@ -283,7 +310,7 @@ suite("aggregate_with_roll_up") {
"l_partkey, " +
"l_suppkey"
order_qt_query15_0_before "${query15_0}"
check_rewrite(mv15_0, query15_0, "mv15_0")
check_rewrite_with_mv_partition(mv15_0, query15_0, "mv15_0", "l_shipdate")
order_qt_query15_0_after "${query15_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv15_0"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ suite("aggregate_without_roll_up") {
o_comment VARCHAR(79) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
PARTITION BY RANGE(o_orderdate) (PARTITION `day_2` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(o_orderdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
)
);
"""

sql """
Expand All @@ -74,7 +78,11 @@ suite("aggregate_without_roll_up") {
l_comment VARCHAR(44) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
PARTITION BY RANGE(l_shipdate) (PARTITION `day_1` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(l_shipdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand All @@ -100,7 +108,8 @@ suite("aggregate_without_roll_up") {
)
"""

sql """ insert into lineitem values
sql """
insert into lineitem values
(1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
(2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
(3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
Expand Down

0 comments on commit 6ea6ff5

Please sign in to comment.