Skip to content

Commit

Permalink
fix wrong data bug
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Dec 19, 2023
1 parent d8ac8ff commit 6240acd
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 66 deletions.
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
// this will be removed in the future when support join derivation
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES, ELIMINATE_OUTER_JOIN");
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public <OUTPUT_TYPE extends Plan> PatternMatcher<INPUT_TYPE, OUTPUT_TYPE> thenAp
return new PatternMatcher<>(pattern, defaultPromise, matchedAction);
}

/**
* Apply rule to return multi result, catch exception to make sure no influence on other rule
*/
public <OUTPUT_TYPE extends Plan> PatternMatcher<INPUT_TYPE, OUTPUT_TYPE> thenApplyMultiNoThrow(
MatchedMultiAction<INPUT_TYPE, OUTPUT_TYPE> matchedMultiAction) {
MatchedMultiAction<INPUT_TYPE, OUTPUT_TYPE> adaptMatchedMultiAction = ctx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
logger.info(currentClassName + " predicate compensate fail so continue");
continue;
}
Plan rewritedPlan;
Plan rewrittenPlan;
Plan mvScan = materializationContext.getMvScanPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewritedPlan = mvScan;
rewrittenPlan = mvScan;
} else {
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(
Expand All @@ -175,36 +175,48 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
logger.info(currentClassName + " compensate predicate rewrite by view fail so continue");
continue;
}
rewritedPlan = new LogicalFilter<>(Sets.newHashSet(rewriteCompensatePredicates), mvScan);
rewrittenPlan = new LogicalFilter<>(Sets.newHashSet(rewriteCompensatePredicates), mvScan);
}
// Rewrite query by view
rewritedPlan = rewriteQueryByView(matchMode,
rewrittenPlan = rewriteQueryByView(matchMode,
queryStructInfo,
viewStructInfo,
queryToViewSlotMapping,
rewritedPlan,
rewrittenPlan,
materializationContext);
if (rewritedPlan == null) {
if (rewrittenPlan == null) {
logger.info(currentClassName + " rewrite query by view fail so continue");
continue;
}
if (!checkPartitionIsValid(queryStructInfo, materializationContext, cascadesContext)) {
logger.info(currentClassName + " check partition validation fail so continue");
continue;
}
if (!checkOutput(queryPlan, rewrittenPlan)) {
continue;
}
// run rbo job on mv rewritten plan
CascadesContext rewrittenPlanContext =
CascadesContext.initContext(cascadesContext.getStatementContext(), rewritedPlan,
CascadesContext.initContext(cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
Rewriter.getWholeTreeRewriter(cascadesContext).execute();
rewritedPlan = rewrittenPlanContext.getRewritePlan();
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
logger.info(currentClassName + "rewrite by materialized view success");
rewriteResults.add(rewritedPlan);
rewriteResults.add(rewrittenPlan);
}
}
return rewriteResults;
}

protected boolean checkOutput(Plan sourcePlan, Plan rewrittenPlan) {
if (sourcePlan.getGroupExpression().isPresent() && !rewrittenPlan.getLogicalProperties().equals(
sourcePlan.getGroupExpression().get().getOwnerGroup().getLogicalProperties())) {
logger.error("rewrittenPlan output logical properties is not same with target group");
return false;
}
return true;
}

/**
* Partition will be pruned in query then add the record the partitions to select partitions on
* catalog relation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -34,7 +35,7 @@
public class Predicates {

// Predicates that can be pulled up
private final List<Expression> pulledUpPredicates = new ArrayList<>();
private final Set<Expression> pulledUpPredicates = new HashSet<>();

private Predicates() {
}
Expand All @@ -49,7 +50,7 @@ public static Predicates of(List<? extends Expression> pulledUpPredicates) {
return predicates;
}

public List<? extends Expression> getPulledUpPredicates() {
public Set<? extends Expression> getPulledUpPredicates() {
return pulledUpPredicates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Split the expression to equal, range and residual predicate.
Expand All @@ -39,27 +38,26 @@
*/
public class PredicatesSplitter {

private final List<Expression> equalPredicates = new ArrayList<>();
private final List<Expression> rangePredicates = new ArrayList<>();
private final List<Expression> residualPredicates = new ArrayList<>();
private final Set<Expression> equalPredicates = new HashSet<>();
private final Set<Expression> rangePredicates = new HashSet<>();
private final Set<Expression> residualPredicates = new HashSet<>();
private final List<Expression> conjunctExpressions;

private final PredicateExtract instance = new PredicateExtract();

public PredicatesSplitter(Expression target) {
this.conjunctExpressions = ExpressionUtils.extractConjunction(target);
PredicateExtract instance = new PredicateExtract();
for (Expression expression : conjunctExpressions) {
expression.accept(instance, expression);
expression.accept(instance, null);
}
}

/**
* PredicateExtract
* extract to equal, range, residual predicate set
*/
public class PredicateExtract extends DefaultExpressionVisitor<Void, Expression> {
public class PredicateExtract extends DefaultExpressionVisitor<Void, Void> {

@Override
public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Expression sourceExpression) {
public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Void context) {
Expression leftArg = comparisonPredicate.getArgument(0);
Expression rightArg = comparisonPredicate.getArgument(1);
boolean leftArgOnlyContainsColumnRef = containOnlyColumnRef(leftArg, true);
Expand All @@ -69,7 +67,7 @@ public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Ex
equalPredicates.add(comparisonPredicate);
return null;
} else {
residualPredicates.add(comparisonPredicate);
rangePredicates.add(comparisonPredicate);
}
} else if ((leftArgOnlyContainsColumnRef && rightArg instanceof Literal)
|| (rightArgOnlyContainsColumnRef && leftArg instanceof Literal)) {
Expand All @@ -81,12 +79,9 @@ public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Ex
}

@Override
public Void visitCompoundPredicate(CompoundPredicate compoundPredicate, Expression context) {
if (compoundPredicate instanceof Or) {
residualPredicates.add(compoundPredicate);
return null;
}
return super.visitCompoundPredicate(compoundPredicate, context);
public Void visit(Expression expr, Void context) {
residualPredicates.add(expr);
return null;
}
}

Expand All @@ -98,7 +93,7 @@ public Predicates.SplitPredicate getSplitPredicate() {
}

private static boolean containOnlyColumnRef(Expression expression, boolean allowCast) {
if (expression instanceof SlotReference && ((SlotReference) expression).isColumnFromTable()) {
if (expression instanceof SlotReference && expression.isColumnFromTable()) {
return true;
}
if (allowCast && expression instanceof Cast) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void initPredicates() {
private void predicatesDerive() {
// construct equivalenceClass according to equals predicates
List<Expression> shuttledExpression = ExpressionUtils.shuttleExpressionWithLineage(
this.predicates.getPulledUpPredicates(), originalPlan).stream()
new ArrayList<>(this.predicates.getPulledUpPredicates()), originalPlan).stream()
.map(Expression.class::cast)
.collect(Collectors.toList());
SplitPredicate splitPredicate = Predicates.splitPredicates(ExpressionUtils.and(shuttledExpression));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@
-- !query1_0_before --

-- !query1_0_after --
1 yy 0 0 77.50 33.50 9.50 5
2 mi 0 0 57.40 56.20 1.20 2
2 mm 0 0 43.20 43.20 43.20 1

-- !query1_1_before --
2023-12-08 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-09 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-10 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-11 2 mm 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-12 2 mi 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

-- !query1_1_after --
2023-12-08 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-09 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-10 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-11 2 mm 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2023-12-12 2 mi 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

-- !query1_2_before --
1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
Expand Down Expand Up @@ -67,7 +78,6 @@
2 3 2023-12-12 57.40 56.20 1.20 2 0
2 4 2023-12-10 46.00 33.50 12.50 2 0
3 3 2023-12-11 43.20 43.20 43.20 1 0
4 3 2023-12-09 11.50 11.50 11.50 1 0

-- !query16_0_before --
2 3 2023-12-08 20.00 10.50 9.50 2 0
Expand All @@ -89,6 +99,20 @@
-- !query17_0_after --
3 3 2023-12-11 43.20 43.20 43.20 1 0

-- !query17_1_before --
1 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
2 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
3 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
4 2 mm 0 0 0 0 0 0 0 0 0 0 0 0 0
5 2 mi 0 0 0 0 0 0 0 0 0 0 0 0 0

-- !query17_1_after --
1 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
2 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
3 1 yy 0 0 0 0 0 0 0 0 0 0 0 0 0
4 2 mm 0 0 0 0 0 0 0 0 0 0 0 0 0
5 2 mi 0 0 0 0 0 0 0 0 0 0 0 0 0

-- !query18_0_before --

-- !query18_0_after --
Expand Down Expand Up @@ -125,3 +149,9 @@
4 2 43.20
6 2 57.40

-- !query20_0_before --
0 0 0 0 0 0 0 0 0 0 0 0

-- !query20_0_after --
0 0 0 0 0 0 0 0 0 0 0 0

14 changes: 12 additions & 2 deletions regression-test/data/nereids_rules_p0/mv/join/inner/inner_join.out
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,6 @@
4
4
4
6
6

-- !query5_0_before --
4
Expand All @@ -261,6 +259,18 @@
6
6

-- !query6_0_before --
2 3 2023-12-08
2 3 2023-12-08

-- !query6_0_after --
2 3 2023-12-08
2 3 2023-12-08

-- !query7_0_before --

-- !query7_0_after --

-- !query10_0_before --

-- !query10_0_after --
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ suite("aggregate_without_roll_up") {
sql """ DROP MATERIALIZED VIEW IF EXISTS mv1_0"""


def mv1_1 = "select O_SHIPPRIORITY, O_COMMENT, " +
def mv1_1 = "select O_SHIPPRIORITY, O_COMMENT, O_ORDERDATE, " +
"count(distinct case when O_SHIPPRIORITY > 1 and O_ORDERKEY IN (1, 3) then O_ORDERSTATUS else null end) as filter_cnt_1, " +
"count(distinct case when O_SHIPPRIORITY > 2 and O_ORDERKEY IN (2) then O_ORDERSTATUS else null end) as filter_cnt_2, " +
"count(distinct case when O_SHIPPRIORITY > 3 and O_ORDERKEY IN (3, 4) then O_ORDERSTATUS else null end) as filter_cnt_3, " +
Expand All @@ -218,9 +218,10 @@ suite("aggregate_without_roll_up") {
"from orders " +
"where O_ORDERDATE < '2023-12-30'" +
"group by " +
"O_ORDERDATE, " +
"O_SHIPPRIORITY, " +
"O_COMMENT "
def query1_1 = "select O_SHIPPRIORITY, O_COMMENT, " +
def query1_1 = "select O_ORDERDATE, O_SHIPPRIORITY, O_COMMENT, " +
"count(distinct case when O_SHIPPRIORITY > 1 and O_ORDERKEY IN (1, 3) then O_ORDERSTATUS else null end) as filter_cnt_1, " +
"count(distinct case when O_SHIPPRIORITY > 2 and O_ORDERKEY IN (2) then O_ORDERSTATUS else null end) as filter_cnt_2, " +
"count(distinct case when O_SHIPPRIORITY > 3 and O_ORDERKEY IN (3, 4) then O_ORDERSTATUS else null end) as filter_cnt_3, " +
Expand All @@ -239,13 +240,13 @@ suite("aggregate_without_roll_up") {
"from orders " +
"where O_ORDERDATE < '2023-12-30' and O_ORDERDATE > '2023-12-01'" +
"group by " +
"O_ORDERDATE, " +
"O_SHIPPRIORITY, " +
"O_COMMENT "
// should support but not, tmp
// order_qt_query1_1_before "${query1_1}"
// check_rewrite(mv1_1, query1_1, "mv1_1")
// order_qt_query1_1_after "${query1_1}"
// sql """ DROP MATERIALIZED VIEW IF EXISTS mv1_1"""
order_qt_query1_1_before "${query1_1}"
check_rewrite(mv1_1, query1_1, "mv1_1")
order_qt_query1_1_after "${query1_1}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv1_1"""


def mv1_2 = "select O_SHIPPRIORITY, O_COMMENT, " +
Expand Down Expand Up @@ -609,11 +610,10 @@ suite("aggregate_without_roll_up") {
"lineitem.L_ORDERKEY, " +
"orders.O_SHIPPRIORITY, " +
"orders.O_COMMENT "
// rewrite success but cbo not chose, tmp
// order_qt_query17_1_before "${query17_1}"
// check_rewrite(mv17_1, query17_1, "mv17_1")
// order_qt_query17_1_after "${query17_1}"
// sql """ DROP MATERIALIZED VIEW IF EXISTS mv17_1"""
order_qt_query17_1_before "${query17_1}"
check_rewrite(mv17_1, query17_1, "mv17_1")
order_qt_query17_1_after "${query17_1}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv17_1"""

// filter outside + left + right
def mv18_0 = "select l_shipdate, l_suppkey, " +
Expand Down Expand Up @@ -760,9 +760,8 @@ suite("aggregate_without_roll_up") {
"orders " +
"on lineitem.L_ORDERKEY = orders.O_ORDERKEY " +
"where orders.O_ORDERDATE < '2023-12-30' and orders.O_ORDERDATE > '2023-12-01' "
// rewrite success but cbo not chose, tmp
// order_qt_query20_0_before "${query20_0}"
// check_rewrite(mv20_0, query20_0, "mv20_0")
// order_qt_query20_0_after "${query20_0}"
// sql """ DROP MATERIALIZED VIEW IF EXISTS mv20_0"""
order_qt_query20_0_before "${query20_0}"
check_rewrite(mv20_0, query20_0, "mv20_0")
order_qt_query20_0_after "${query20_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv20_0"""
}
Loading

0 comments on commit 6240acd

Please sign in to comment.