Skip to content

Commit

Permalink
add LogicalCompatibilityContext
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Dec 5, 2023
1 parent 720ea15 commit e2bc938
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
if (queryToViewSlotMapping == null) {
continue;
}
// todo int LogicalCompatibilityContext firstly and outer check join
if (!StructInfo.isGraphLogicalEquals(queryStructInfo.getHyperGraph(), viewStructInfo.getHyperGraph())) {
LogicalCompatibilityContext compatibilityContext =
LogicalCompatibilityContext.from(queryToViewTableMapping, queryToViewSlotMapping,
queryStructInfo, viewStructInfo);
// todo outer join compatibility check
if (!StructInfo.isGraphLogicalEquals(queryStructInfo, viewStructInfo, compatibilityContext)) {
continue;
}
SplitPredicate compensatePredicates = predicatesCompensate(queryStructInfo, viewStructInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,84 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.nereids.jobs.joinorder.hypergraph.Edge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedRelation;
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;

import java.util.HashMap;
import java.util.Map;

/**
* For outer join we should check the outer join compatibility between query and view
*/
public class LogicalCompatibilityContext {
private BiMap<StructInfoNode, StructInfoNode> queryToViewNodeMapping;
private BiMap<Expression, Expression> queryToViewEdgeExpressionMapping;

private Map<StructInfoNode, StructInfoNode> queryToViewNodeMapping;
private BiMap<Edge, Edge> queryToViewEdgeMapping;

public LogicalCompatibilityContext(Map<StructInfoNode, StructInfoNode> queryToViewNodeMapping,
BiMap<Edge, Edge> queryToViewEdgeMapping) {
public LogicalCompatibilityContext(BiMap<StructInfoNode, StructInfoNode> queryToViewNodeMapping,
BiMap<Expression, Expression> queryToViewEdgeExpressionMapping) {
this.queryToViewNodeMapping = queryToViewNodeMapping;
this.queryToViewEdgeMapping = queryToViewEdgeMapping;
this.queryToViewEdgeExpressionMapping = queryToViewEdgeExpressionMapping;
}

public Map<StructInfoNode, StructInfoNode> getQueryToViewNodeMapping() {
public BiMap<StructInfoNode, StructInfoNode> getQueryToViewNodeMapping() {
return queryToViewNodeMapping;
}

public BiMap<Edge, Edge> getQueryToViewEdgeMapping() {
return queryToViewEdgeMapping;
public BiMap<Expression, Expression> getQueryToViewEdgeExpressionMapping() {
return queryToViewEdgeExpressionMapping;
}

/**
* generate logical compatibility context
*/
public static LogicalCompatibilityContext from(RelationMapping relationMapping,
SlotMapping slotMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo) {
// init node mapping
BiMap<StructInfoNode, StructInfoNode> queryToViewNodeMapping = HashBiMap.create();
Map<RelationId, StructInfoNode> queryRelationIdStructInfoNodeMap
= queryStructInfo.getRelationIdStructInfoNodeMap();
Map<RelationId, StructInfoNode> viewRelationIdStructInfoNodeMap
= viewStructInfo.getRelationIdStructInfoNodeMap();
for (Map.Entry<MappedRelation, MappedRelation> relationMappingEntry :
relationMapping.getMappedRelationMap().entrySet()) {
StructInfoNode queryStructInfoNode = queryRelationIdStructInfoNodeMap.get(
relationMappingEntry.getKey().getRelationId());
StructInfoNode viewStructInfoNode = viewRelationIdStructInfoNodeMap.get(
relationMappingEntry.getValue().getRelationId());
if (queryStructInfoNode != null && viewStructInfoNode != null) {
queryToViewNodeMapping.put(queryStructInfoNode, viewStructInfoNode);
}
}
// init expression mapping
Map<SlotReference, SlotReference> viewToQuerySlotMapping = slotMapping.inverse().toSlotReferenceMap();
Map<Expression, Expression> queryShuttledExprToExprMap =
queryStructInfo.getShuttledHashConjunctsToConjunctsMap();
Map<Expression, Expression> viewShuttledExprToExprMap =
viewStructInfo.getShuttledHashConjunctsToConjunctsMap();
final Map<Expression, Expression> viewEdgeToConjunctsMapQueryBased = new HashMap<>();
viewShuttledExprToExprMap.forEach((shuttledExpr, expr) -> {
viewEdgeToConjunctsMapQueryBased.put(
ExpressionUtils.replace(shuttledExpr, viewToQuerySlotMapping),
expr);
});
BiMap<Expression, Expression> queryToViewEdgeMapping = HashBiMap.create();
queryShuttledExprToExprMap.forEach((exprSet, edge) -> {
Expression viewExpr = viewEdgeToConjunctsMapQueryBased.get(exprSet);
if (viewExpr != null) {
queryToViewEdgeMapping.put(edge, viewExpr);
}
});
return new LogicalCompatibilityContext(queryToViewNodeMapping, queryToViewEdgeMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public MaterializationContext(MTMV mtmv,
MVCache mvCache = mtmv.getMvCache();
// TODO This logic should move to materialized view cache manager
if (mvCache == null) {
mtmv.setMvCache(MVCache.from(mtmv, cascadesContext.getConnectContext()));
mvCache = MVCache.from(mtmv, cascadesContext.getConnectContext());
mtmv.setMvCache(mvCache);
}
List<NamedExpression> mvOutputExpressions = mvCache.getMvOutputExpressions();
// mv output expression shuttle, this will be used to expression rewrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
Expand All @@ -26,21 +27,25 @@
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -50,10 +55,10 @@
*/
public class StructInfo {
public static final JoinPatternChecker JOIN_PATTERN_CHECKER = new JoinPatternChecker();
// struct info splitter
public static final PlanSplitter PLAN_SPLITTER = new PlanSplitter();
private static final RelationCollector RELATION_COLLECTOR = new RelationCollector();
private static final PredicateCollector PREDICATE_COLLECTOR = new PredicateCollector();
// struct info splitter
private static final PlanSplitter PLAN_SPLITTER = new PlanSplitter();
// source data
private final Plan originalPlan;
private final HyperGraph hyperGraph;
Expand All @@ -64,9 +69,13 @@ public class StructInfo {
// bottom plan which top plan only contain join or scan. this is needed by hyper graph
private Plan bottomPlan;
private final List<CatalogRelation> relations = new ArrayList<>();
// this is for LogicalCompatibilityContext later
private final Map<RelationId, StructInfoNode> relationIdStructInfoNodeMap = new HashMap<>();
private Predicates predicates;
private SplitPredicate splitPredicate;
private EquivalenceClass equivalenceClass;
// this is for LogicalCompatibilityContext later
private final Map<Expression, Expression> shuttledHashConjunctsToConjunctsMap = new HashMap<>();

private StructInfo(Plan originalPlan, @Nullable Plan topPlan, @Nullable Plan bottomPlan, HyperGraph hyperGraph) {
this.originalPlan = originalPlan;
Expand All @@ -79,16 +88,24 @@ private StructInfo(Plan originalPlan, @Nullable Plan topPlan, @Nullable Plan bot
private void init() {

if (topPlan == null || bottomPlan == null) {
List<Plan> topPlans = new ArrayList<>();
this.bottomPlan = originalPlan.accept(PLAN_SPLITTER, topPlans);
this.topPlan = topPlans.get(0);
PlanSplitContext planSplitContext = new PlanSplitContext(Sets.newHashSet(LogicalJoin.class));
originalPlan.accept(PLAN_SPLITTER, planSplitContext);
this.bottomPlan = planSplitContext.getBottomPlan();
this.topPlan = planSplitContext.getTopPlan();
}

this.predicates = Predicates.of();
// Collect predicate from join condition in hyper graph
this.hyperGraph.getEdges().forEach(edge -> {
List<Expression> hashJoinConjuncts = edge.getHashJoinConjuncts();
hashJoinConjuncts.forEach(this.predicates::addPredicate);
hashJoinConjuncts.forEach(conjunctExpr -> {
predicates.addPredicate(conjunctExpr);
// shuttle expression in edge for LogicalCompatibilityContext later
shuttledHashConjunctsToConjunctsMap.put(
ExpressionUtils.shuttleExpressionWithLineage(
Lists.newArrayList(conjunctExpr), edge.getJoin()).get(0),
conjunctExpr);
});
List<Expression> otherJoinConjuncts = edge.getOtherJoinConjuncts();
if (!otherJoinConjuncts.isEmpty()) {
this.valid = false;
Expand All @@ -100,9 +117,14 @@ private void init() {

// Collect predicate from filter node in hyper graph
this.hyperGraph.getNodes().forEach(node -> {
// plan relation collector
// plan relation collector and set to map
Plan nodePlan = node.getPlan();
nodePlan.accept(RELATION_COLLECTOR, this.relations);
List<CatalogRelation> nodeRelations = new ArrayList<>();
nodePlan.accept(RELATION_COLLECTOR, nodeRelations);
this.relations.addAll(nodeRelations);
// every node should only have one relation, this is for LogicalCompatibilityContext
relationIdStructInfoNodeMap.put(nodeRelations.get(0).getRelationId(), (StructInfoNode) node);

// if inner join add where condition
Set<Expression> predicates = new HashSet<>();
nodePlan.accept(PREDICATE_COLLECTOR, predicates);
Expand Down Expand Up @@ -142,13 +164,13 @@ private void init() {
public static List<StructInfo> of(Plan originalPlan) {
// TODO only consider the inner join currently, Should support outer join
// Split plan by the boundary which contains multi child
List<Plan> topPlans = new ArrayList<>();
Plan bottomPlan = originalPlan.accept(PLAN_SPLITTER, topPlans);
Plan topPlan = topPlans.get(0);
PlanSplitContext planSplitContext = new PlanSplitContext(Sets.newHashSet(LogicalJoin.class));
originalPlan.accept(PLAN_SPLITTER, planSplitContext);

List<HyperGraph> structInfos = HyperGraph.toStructInfo(bottomPlan);
List<HyperGraph> structInfos = HyperGraph.toStructInfo(planSplitContext.getBottomPlan());
return structInfos.stream()
.map(hyperGraph -> new StructInfo(originalPlan, topPlan, bottomPlan, hyperGraph))
.map(hyperGraph -> new StructInfo(originalPlan, planSplitContext.getTopPlan(),
planSplitContext.getBottomPlan(), hyperGraph))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -189,6 +211,22 @@ public boolean isValid() {
return valid;
}

public Plan getTopPlan() {
return topPlan;
}

public Plan getBottomPlan() {
return bottomPlan;
}

public Map<RelationId, StructInfoNode> getRelationIdStructInfoNodeMap() {
return relationIdStructInfoNodeMap;
}

public Map<Expression, Expression> getShuttledHashConjunctsToConjunctsMap() {
return shuttledHashConjunctsToConjunctsMap;
}

public List<? extends Expression> getExpressions() {
return originalPlan instanceof LogicalProject
? ((LogicalProject<Plan>) originalPlan).getProjects() : originalPlan.getOutput();
Expand All @@ -199,7 +237,8 @@ public List<? extends Expression> getExpressions() {
* For inner join should judge only the join tables,
* for other join type should also judge the join direction, it's input filter that can not be pulled up etc.
*/
public static boolean isGraphLogicalEquals(HyperGraph source, HyperGraph target) {
public static boolean isGraphLogicalEquals(StructInfo queryStructInfo, StructInfo viewStructInfo,
LogicalCompatibilityContext compatibilityContext) {
// TODO: if not inner join, should check the join graph logical equivalence
return true;
}
Expand All @@ -224,23 +263,62 @@ public Void visit(Plan plan, Set<Expression> predicates) {
}
}

private static class PlanSplitter extends DefaultPlanRewriter<List<Plan>> {

/**
* Split the plan into bottom and up, the boundary is given by context,
* the bottom contains the boundary.
*/
public static class PlanSplitter extends DefaultPlanVisitor<Void, PlanSplitContext> {
@Override
public Plan visitLogicalRelation(LogicalRelation relation, List<Plan> topPlans) {
return relation;
public Void visit(Plan plan, PlanSplitContext context) {
if (context.getTopPlan() == null) {
context.setTopPlan(plan);
}
if (context.isBoundary(plan)) {
context.setBottomPlan(plan);
return null;
}
return super.visit(plan, context);
}
}

@Override
public Plan visit(Plan plan, List<Plan> topPlans) {
if (plan instanceof Join || plan instanceof SetOperation) {
return plan;
} else {
if (topPlans.isEmpty()) {
topPlans.add(plan);
/**
* Plan split context, this hold bottom and top plan, and boundary plan setting
*/
public static class PlanSplitContext {
private Plan bottomPlan;
private Plan topPlan;
private Set<Class<? extends Plan>> boundaryPlanClazzSet;

public PlanSplitContext(Set<Class<? extends Plan>> boundaryPlanClazzSet) {
this.boundaryPlanClazzSet = boundaryPlanClazzSet;
}

public Plan getBottomPlan() {
return bottomPlan;
}

public void setBottomPlan(Plan bottomPlan) {
this.bottomPlan = bottomPlan;
}

public Plan getTopPlan() {
return topPlan;
}

public void setTopPlan(Plan topPlan) {
this.topPlan = topPlan;
}

/**
* isBoundary
*/
public boolean isBoundary(Plan plan) {
for (Class<? extends Plan> boundaryPlanClazz : boundaryPlanClazzSet) {
if (boundaryPlanClazz.isAssignableFrom(plan.getClass())) {
return true;
}
return plan.children().get(0).accept(this, topPlans);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public RelationId getNextId() {
public String toString() {
return "RelationId#" + id;
}

@Override
public boolean equals(Object obj) {
return super.equals(obj);
}

@Override
public int hashCode() {
return super.hashCode();
}
}

0 comments on commit e2bc938

Please sign in to comment.