Skip to content

Commit

Permalink
query rewrite support filter valid partition filter
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Dec 15, 2023
1 parent ad916cd commit 768736f
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 98 deletions.
4 changes: 0 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ public MTMVRelation getRelation() {
return relation;
}

public MTMVCache getCache() {
return cache;
}

public void setCache(MTMVCache cache) {
this.cache = cache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
// view should also disable the predicate infer and join eliminate.
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -49,7 +50,8 @@ public class MTMVRelationManager implements MTMVHookService {
private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs = Maps.newConcurrentMap();

public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) {
return tableMTMVs.get(table);
Set<BaseTableInfo> baseTableInfos = tableMTMVs.get(table);
return baseTableInfos == null? ImmutableSet.of() : baseTableInfos;
}

public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMvRewrite()) {
if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
return res;
}
MTMVRelation mtmvRelation = mtmv.getRelation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ public void setOuterScope(@Nullable Scope outerScope) {
}

public List<MaterializationContext> getMaterializationContexts() {
return materializationContexts;
return materializationContexts.stream()
.filter(MaterializationContext::isAvailable)
.collect(Collectors.toList());
}

public void addMaterializationContext(MaterializationContext materializationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@
* @see PlaceholderCollector
*/
public class PlaceholderExpression extends Expression implements AlwaysNotNullable {

protected boolean distinct;
private final Class<? extends Expression> delegateClazz;
/**
* 1 based
*/
private final int position;
protected boolean distinct;

public PlaceholderExpression(List<Expression> children, Class<? extends Expression> delegateClazz, int position) {
super(children);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private boolean isAggregateFunctionEquivalent(Function queryFunction, Function v
PlaceholderExpression equivalentFunction = AGGREGATE_ROLL_UP_EQUIVALENT_FUNCTION_MAP.get(
PlaceholderExpression.of(queryFunction.getClass(), 0, isDistinct));
// check is have equivalent function or not
if (equivalentFunction == null){
if (equivalentFunction == null) {
return false;
}
// current compare
Expand All @@ -365,7 +365,7 @@ private boolean isAggregateFunctionEquivalent(Function queryFunction, Function v
}
for (int i = 0; i < viewFunction.children().size(); i++) {
if (!viewFunction.child(i).getClass().equals(
((PlaceholderExpression)equivalentFunctions.get(i)).getDelegateClazz())) {
((PlaceholderExpression) equivalentFunctions.get(i)).getDelegateClazz())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,27 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
Expand All @@ -35,13 +49,19 @@
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -53,9 +73,9 @@
* The abstract class for all materialized view rules
*/
public abstract class AbstractMaterializedViewRule {

public static final HashSet<JoinType> SUPPORTED_JOIN_TYPE_SET =
Sets.newHashSet(JoinType.INNER_JOIN, JoinType.LEFT_OUTER_JOIN);
private final Logger logger = LogManager.getLogger(this.getClass());

/**
* The abstract template method for query rewrite, it contains the main logic and different query
Expand All @@ -82,7 +102,10 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) {
continue;
}
Plan mvPlan = materializationContext.getMtmv().getCache().getLogicalPlan();
Plan mvPlan = handleValidPartition(materializationContext.getMtmv(), cascadesContext);
if (mvPlan == null) {
continue;
}
List<StructInfo> viewStructInfos = extractStructInfo(mvPlan, cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one
Expand Down Expand Up @@ -159,6 +182,72 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
return rewriteResults;
}

private MTMVCache getCacheFromMTMV(MTMV mtmv) {
MTMVCache cache;
try {
cache = mtmv.getOrGenerateCache();
} catch (AnalysisException analysisException) {
logger.warn("get mtmv cache analysisException", analysisException);
return null;
}
return cache;
}

// return the plan with filter if some partition is valid
private Plan handleValidPartition(MTMV mtmv, CascadesContext cascadesContext) {
PartitionInfo partitionInfo = mtmv.getPartitionInfo();
PartitionType partitionType = partitionInfo.getType();
MTMVCache mtmvCache = getCacheFromMTMV(mtmv);
if (mtmvCache == null) {
return null;
}
if (PartitionType.UNPARTITIONED.equals(partitionType)) {
// not handle un partition table
return mtmvCache.getLogicalPlan();
}
Map<Long, PartitionItem> allPartitions = partitionInfo.getAllPartitions();
Collection<Partition> dataValidPartitions = MTMVUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext());
if (!allPartitions.isEmpty() && dataValidPartitions.isEmpty()) {
// do not have valid partition
return null;
}
if (allPartitions.size() == dataValidPartitions.size()) {
// todo deep equals check,all partition is valid just return the plan
return mtmvCache.getLogicalPlan();
}
// handle the scene when some partition is valid
Set<Expression> disjunctions = new HashSet<>();
Set<Long> allPartitionIdSet = allPartitions.keySet();
Plan logicalPlan = mtmvCache.getLogicalPlan();
// get mv partition column name
Map<String, Slot> mvPlanOutputNameMap = new HashMap<>();
logicalPlan.getOutput().forEach(slot -> mvPlanOutputNameMap.putIfAbsent(slot.getName(), slot));
MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
Slot partitionColumnSlot = mvPlanOutputNameMap.get(mvPartitionInfo.getPartitionCol());
if (partitionColumnSlot == null) {
return null;
}
for (Partition validPartition : dataValidPartitions) {
if (!allPartitionIdSet.contains(validPartition.getId())) {
return null;
}
disjunctions.add(UpdateMvByPartitionCommand.convertPartitionItemToPredicate(
allPartitions.get(validPartition.getId()),
partitionColumnSlot
));
}

// filter condition optimization
ExpressionOptimization expressionOptimization = new ExpressionOptimization();
ExpressionNormalization expressionNormalization = new ExpressionNormalization();
ExpressionRewriteContext expressionRewriteContext = new ExpressionRewriteContext(cascadesContext);
Expression optimizedExpression = expressionOptimization.rewrite(ExpressionUtils.or(disjunctions),
expressionRewriteContext);
optimizedExpression = expressionNormalization.rewrite(optimizedExpression, expressionRewriteContext);
return new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet(optimizedExpression), mtmvCache.getLogicalPlan());
}

/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.PlannerHook;
Expand All @@ -42,13 +38,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/** If enable query rewrite with mv, should init materialization context after analyze*/
/**
* If enable query rewrite with mv, should init materialization context after analyze
*/
public class InitMaterializationContextHook implements PlannerHook {

public static final Logger LOG = LogManager.getLogger(InitMaterializationContextHook.class);
Expand All @@ -60,7 +57,6 @@ public void afterAnalyze(NereidsPlanner planner) {
}

private void initMaterializationContext(CascadesContext cascadesContext) {

if (!cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
return;
}
Expand All @@ -71,48 +67,30 @@ private void initMaterializationContext(CascadesContext cascadesContext) {
if (collectedTables.isEmpty()) {
return;
}
List<BaseTableInfo> baseTableUsed =
List<BaseTableInfo> usedBaseTables =
collectedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList());
// TODO the logic should be move to MTMVRelationManager later when getAvailableMaterializedView is ready in
// MV Cache manager
Env env = cascadesContext.getConnectContext().getEnv();
MTMVRelationManager cacheManager = env.getMtmvService().getRelationManager();
Set<BaseTableInfo> materializedViews = new HashSet<>();
for (BaseTableInfo baseTableInfo : baseTableUsed) {
Set<BaseTableInfo> mtmvsByBaseTable = cacheManager.getMtmvsByBaseTable(baseTableInfo);
if (mtmvsByBaseTable == null || mtmvsByBaseTable.isEmpty()) {
continue;
}
materializedViews.addAll(mtmvsByBaseTable);
}
if (materializedViews.isEmpty()) {
Set<MTMV> availableMTMVs = Env.getCurrentEnv().getMtmvService().getRelationManager()
.getAvailableMTMVs(usedBaseTables);
if (availableMTMVs.isEmpty()) {
return;
}
materializedViews.forEach(mvBaseTableInfo -> {
try {
MTMV materializedView = (MTMV) Env.getCurrentInternalCatalog()
.getDbOrMetaException(mvBaseTableInfo.getDbId())
.getTableOrMetaException(mvBaseTableInfo.getTableId(), TableType.MATERIALIZED_VIEW);

// generate outside, maybe add partition filter in the future
LogicalOlapScan mvScan = new LogicalOlapScan(
cascadesContext.getStatementContext().getNextRelationId(),
(OlapTable) materializedView,
ImmutableList.of(materializedView.getQualifiedDbName()),
// this must be empty, or it will be used to sample
Lists.newArrayList(),
Lists.newArrayList(),
Optional.empty());
mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId());
List<NamedExpression> mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast)
.collect(Collectors.toList());
// todo should force keep consistency to mv sql plan output
Plan projectScan = new LogicalProject<Plan>(mvProjects, mvScan);
cascadesContext.addMaterializationContext(
MaterializationContext.fromMaterializedView(materializedView, projectScan, cascadesContext));
} catch (MetaNotFoundException metaNotFoundException) {
LOG.error(mvBaseTableInfo.toString() + " can not find corresponding materialized view.");
}
availableMTMVs.forEach(materializedView -> {
// generate outside, maybe add partition filter in the future
LogicalOlapScan mvScan = new LogicalOlapScan(
cascadesContext.getStatementContext().getNextRelationId(),
materializedView,
ImmutableList.of(materializedView.getQualifiedDbName()),
// this must be empty, or it will be used to sample
Lists.newArrayList(),
Lists.newArrayList(),
Optional.empty());
mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId());
List<NamedExpression> mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast)
.collect(Collectors.toList());
// todo should force keep consistency to mv sql plan output
Plan projectScan = new LogicalProject<Plan>(mvProjects, mvScan);
cascadesContext.addMaterializationContext(
MaterializationContext.fromMaterializedView(materializedView, projectScan, cascadesContext));
});
}
}
Loading

0 comments on commit 768736f

Please sign in to comment.