Skip to content

Commit

Permalink
[feature](nereids) Support inner join query rewrite by materialized view
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Dec 5, 2023
1 parent 3595f21 commit f0075e6
Show file tree
Hide file tree
Showing 35 changed files with 1,611 additions and 208 deletions.
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.MVCache;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class MTMV extends OlapTable {
private Map<String, String> mvProperties;
@SerializedName("r")
private MTMVRelation relation;
// Should update after every fresh
private MVCache mvCache;

// For deserialization
public MTMV() {
Expand Down Expand Up @@ -116,6 +119,14 @@ public MTMVRelation getRelation() {
return relation;
}

public MVCache getMvCache() {
return mvCache;
}

public void setMvCache(MVCache mvCache) {
this.mvCache = mvCache;
}

public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
return refreshInfo.updateNotNull(newRefreshInfo);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

/**
* MaterializedViewException
*/
public class MaterializedViewException extends UserException {

public MaterializedViewException(String msg, Throwable cause) {
super(msg, cause);
}

public MaterializedViewException(Throwable cause) {
super(cause);
}

public MaterializedViewException(String msg, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(msg, cause, enableSuppression, writableStackTrace);
}

public MaterializedViewException(String msg) {
super(msg);
}

public MaterializedViewException(InternalErrorCode errCode, String msg) {
super(errCode, msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -70,6 +71,11 @@ public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) {
return tableMTMVs.get(table);
}

// TODO Implement the method which getting materialized view by tables
public List<MTMV> getAvailableMaterializedView(List<BaseTableInfo> tables) {
return ImmutableList.of();
}

public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws AnalysisException, DdlException {
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMvRewrite()) {
Expand Down
80 changes: 80 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;

import java.util.List;
import java.util.stream.Collectors;

/**The cache for materialized view cache */
public class MVCache {

// the materialized view plan which should be optimized by the same rules to query
private final Plan logicalPlan;
// this should be shuttle expression with lineage
private final List<NamedExpression> mvOutputExpressions;
// the context when parse, analyze, optimize the mv logical plan

public MVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}

public Plan getLogicalPlan() {
return logicalPlan;
}

public List<NamedExpression> getMvOutputExpressions() {
return mvOutputExpressions;
}

public MVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}

public static MVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);

planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Plan mvAnalyzedPlan = planner.getAnalyzedPlan();
Plan mvRewrittenPlan = planner.getRewrittenPlan();
Plan mvPlan = mvRewrittenPlan instanceof LogicalResultSink
? (Plan) ((LogicalResultSink) mvRewrittenPlan).child() : mvRewrittenPlan;
List<NamedExpression> mvOutputExpressions = mvAnalyzedPlan.getExpressions().stream()
.map(NamedExpression.class::cast)
.collect(Collectors.toList());
return new MVCache(mvPlan, mvOutputExpressions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class NereidsPlanner extends Planner {
private PhysicalPlan physicalPlan;
// The cost of optimized plan
private double cost = 0;
private List<PlannerHook> hooks = new ArrayList<>();

public NereidsPlanner(StatementContext statementContext) {
this.statementContext = statementContext;
Expand Down Expand Up @@ -260,15 +261,12 @@ private void initCascadesContext(LogicalPlan plan, PhysicalProperties requirePro
if (statementContext.getConnectContext().getTables() != null) {
cascadesContext.setTables(statementContext.getConnectContext().getTables());
}
if (statementContext.getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
// TODO Pre handle materialized view to materializationContext and
// call cascadesContext.addMaterializationContext() to add it
}
}

private void analyze() {
LOG.debug("Start analyze plan");
cascadesContext.newAnalyzer().analyze();
getHooks().forEach(hook -> hook.afterAnalyze(this));
NereidsTracer.logImportantTime("EndAnalyzePlan");
LOG.debug("End analyze plan");
}
Expand Down Expand Up @@ -525,4 +523,12 @@ public Plan getOptimizedPlan() {
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public List<PlannerHook> getHooks() {
return hooks;
}

public void addHook(PlannerHook hook) {
this.hooks.add(hook);
}
}
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids;

/**
* optimize plan process has some phase, such as analyze, rewrite, optimize, generate physical plan
* and so on, this hook give a chance to do something in the planning process.
* For example: after analyze plan when query or explain, we should generate materialization context.
*/
public interface PlannerHook {
default void beforeAnalyze(NereidsPlanner planner) {
}

default void afterAnalyze(NereidsPlanner planner) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.qe.ConnectContext;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -46,7 +48,9 @@ public void execute() {
countJobExecutionTimesOfGroupExpressions(groupExpression);
List<Rule> implementationRules = getRuleSet().getImplementationRules();
List<Rule> explorationRules = getExplorationRules();
if (context.getCascadesContext().getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
ConnectContext connectContext = context.getCascadesContext().getConnectContext();
if (connectContext.getSessionVariable().isEnableMaterializedViewRewrite()) {
explorationRules = new ArrayList<>(explorationRules);
explorationRules.addAll(getRuleSet().getMaterializedViewRules());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class Group {

private int chosenGroupExpressionId = -1;

private Optional<StructInfo> structInfo = Optional.empty();
private List<StructInfo> structInfos = new ArrayList<>();

/**
* Constructor for Group.
Expand Down Expand Up @@ -541,11 +541,15 @@ public String treeString() {
return TreeStringUtils.treeString(this, toString, getChildren, getExtraPlans, displayExtraPlan);
}

public Optional<StructInfo> getStructInfo() {
return structInfo;
public List<StructInfo> getStructInfos() {
return structInfos;
}

public void setStructInfo(StructInfo structInfo) {
this.structInfo = Optional.ofNullable(structInfo);
public void addStructInfo(StructInfo structInfo) {
this.structInfos.add(structInfo);
}

public void addStructInfo(List<StructInfo> structInfos) {
this.structInfos.addAll(structInfos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.Edge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.AbstractNode;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.Sets;

import java.util.HashSet;
import java.util.List;

/**
Expand All @@ -34,21 +44,28 @@ public abstract class AbstractMaterializedViewJoinRule extends AbstractMateriali
protected Plan rewriteQueryByView(MatchMode matchMode,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
RelationMapping queryToViewTableMappings,
Plan tempRewritedPlan) {
SlotMapping queryToViewSlotMappings,
Plan tempRewritedPlan,
MaterializationContext materializationContext) {

List<? extends Expression> queryShuttleExpression = ExpressionUtils.shuttleExpressionWithLineage(
queryStructInfo.getExpressions(),
queryStructInfo.getOriginalPlan());
// Rewrite top projects, represent the query projects by view
List<NamedExpression> expressions = rewriteExpression(
queryStructInfo.getExpressions(),
queryStructInfo,
viewStructInfo,
queryToViewTableMappings,
queryShuttleExpression,
materializationContext.getViewExpressionIndexMapping(),
queryToViewSlotMappings,
tempRewritedPlan
);
// Can not rewrite, bail out
if (expressions == null) {
return null;
}
if (queryStructInfo.getOriginalPlan().getGroupExpression().isPresent()) {
materializationContext.addMatchedGroup(
queryStructInfo.getOriginalPlan().getGroupExpression().get().getOwnerGroup().getGroupId());
}
return new LogicalProject<>(expressions, tempRewritedPlan);
}

Expand All @@ -57,7 +74,20 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
// join condition should be slot reference equals currently
@Override
protected boolean checkPattern(StructInfo structInfo) {
// TODO Should get struct info from hyper graph and check
return false;
HyperGraph hyperGraph = structInfo.getHyperGraph();
HashSet<JoinType> requiredJoinType = Sets.newHashSet(JoinType.INNER_JOIN, JoinType.LEFT_OUTER_JOIN);
for (AbstractNode node : hyperGraph.getNodes()) {
StructInfoNode structInfoNode = (StructInfoNode) node;
if (!structInfoNode.getPlan().accept(StructInfo.JOIN_PATTERN_CHECKER,
requiredJoinType)) {
return false;
}
for (Edge edge : hyperGraph.getEdges()) {
if (!edge.getJoin().accept(StructInfo.JOIN_PATTERN_CHECKER, requiredJoinType)) {
return false;
}
}
}
return true;
}
}
Loading

0 comments on commit f0075e6

Please sign in to comment.