Skip to content

Commit

Permalink
enable rf prune
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Sep 21, 2023
1 parent 0d20a61 commit bd9b058
Show file tree
Hide file tree
Showing 131 changed files with 304 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ public PhysicalProject visitPhysicalProject(PhysicalProject<? extends Plan> proj
Plan newChild = child.accept(this, ctx);
if (newChild instanceof PhysicalProject) {
List<NamedExpression> projections = project.mergeProjections((PhysicalProject) newChild);
return project.withProjectionsAndChild(projections, newChild.child(0));
return (PhysicalProject) project
.withProjectionsAndChild(projections, newChild.child(0))
.copyStatsAndGroupIdFrom(project);
}
return child != newChild ? project.withChildren(Lists.newArrayList(newChild)) : project;
return child != newChild
? (PhysicalProject) project.withChildren(Lists.newArrayList(newChild)).copyStatsAndGroupIdFrom(project)
: project;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public List<PlanPostProcessor> getProcessors() {
builder.add(new PushdownFilterThroughProject());
builder.add(new MergeProjectPostProcessor());
builder.add(new RecomputeLogicalPropertiesProcessor());
builder.add(new TopNScanOpt());
// after generate rf, DO NOT replace PLAN NODE
builder.add(new FragmentProcessor());
if (!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
Expand All @@ -71,7 +72,6 @@ public List<PlanPostProcessor> getProcessors() {
}
}
builder.add(new Validator());
builder.add(new TopNScanOpt());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
Expand All @@ -31,14 +32,20 @@ public class PushdownFilterThroughProject extends PlanPostProcessor {
public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, CascadesContext context) {
Plan child = filter.child();
if (!(child instanceof PhysicalProject)) {
return filter.withChildren(child.accept(this, context));
Plan newChild = child.accept(this, context);
if (newChild == child) {
return filter;
} else {
return ((AbstractPhysicalPlan) filter.withChildren(child.accept(this, context)))
.copyStatsAndGroupIdFrom(filter);
}
}

PhysicalProject<? extends Plan> project = (PhysicalProject<? extends Plan>) child;
PhysicalFilter<? extends Plan> newFilter = filter.withConjunctsAndChild(
ExpressionUtils.replace(filter.getConjuncts(), project.getAliasToProducer()),
project.child());

return project.withChildren(newFilter.accept(this, context));
return ((PhysicalProject) project.withChildren(newFilter.accept(this, context)))
.copyStatsAndGroupIdFrom(project);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@
package org.apache.doris.nereids.processor.post;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;

/**
* merge consecutive projects
*/
public class RecomputeLogicalPropertiesProcessor extends PlanPostProcessor {
@Override
public Plan visit(Plan plan, CascadesContext ctx) {
PhysicalPlan physicalPlan = (PhysicalPlan) visitChildren(this, plan, ctx);
physicalPlan = physicalPlan.resetLogicalProperties();
physicalPlan = physicalPlan.withPhysicalPropertiesAndStats(physicalPlan.getPhysicalProperties(),
((AbstractPlan) plan).getStats());
physicalPlan.setMutableState(MutableState.KEY_GROUP, plan.getGroupIdAsString());
return physicalPlan;
AbstractPhysicalPlan newPlan = (AbstractPhysicalPlan) visitChildren(this, plan, ctx);
return ((AbstractPhysicalPlan) newPlan.resetLogicalProperties())
.copyStatsAndGroupIdFrom((AbstractPhysicalPlan) plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Pla
Plan child = topN.child().accept(this, ctx);
topN = rewriteTopN(topN);
if (child != topN.child()) {
topN.withChildren(child);
topN = ((PhysicalTopN) topN.withChildren(child)).copyStatsAndGroupIdFrom(topN);
}
return topN;
}
Expand All @@ -54,11 +54,11 @@ public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? ext
CascadesContext context) {
Plan child = topN.child().accept(this, context);
if (child != topN.child()) {
topN = topN.withChildren(ImmutableList.of(child));
topN = topN.withChildren(ImmutableList.of(child)).copyStatsAndGroupIdFrom(topN);
}
PhysicalTopN<? extends Plan> rewrittenTopN = rewriteTopN(topN.getPhysicalTopN());
if (topN.getPhysicalTopN() != rewrittenTopN) {
topN = topN.withPhysicalTopN(rewrittenTopN);
topN = topN.withPhysicalTopN(rewrittenTopN).copyStatsAndGroupIdFrom(topN);
}
return topN;
}
Expand Down Expand Up @@ -103,7 +103,7 @@ private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends Plan> to
olapScan = (OlapScan) child;

if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
return topN.withEnableRuntimeFilter(true);
return topN.withEnableRuntimeFilter(true).copyStatsAndGroupIdFrom(topN);
}

return topN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -136,4 +137,11 @@ public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<Runtim
public Plan getExplainPlan(ConnectContext ctx) {
return this;
}

public <T extends AbstractPhysicalPlan> T copyStatsAndGroupIdFrom(T from) {
T newPlan = (T) withPhysicalPropertiesAndStats(
from.getPhysicalProperties(), from.getStats());
newPlan.setMutableState(MutableState.KEY_GROUP, from.getGroupIdAsString());
return newPlan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {
private double broadcastHashtableMemLimitPercentage = 0.2;

@VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE, needForward = true)
public boolean enableRuntimeFilterPrune = false;
public boolean enableRuntimeFilterPrune = true;

/**
* The client can pass some special information by setting this session variable in the format: "k1:v1;k2:v2".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void runBeforeAll() throws Exception {
super.runBeforeAll();
connectContext.getSessionVariable().setRuntimeFilterMode("Global");
connectContext.getSessionVariable().setRuntimeFilterType(8);
connectContext.getSessionVariable().setEnableRuntimeFilterPrune(false);
connectContext.getSessionVariable().expandRuntimeFilterByInnerJoin = false;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ suite("ds_rf{--}") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
String stmt = '''
explain physical plan
{query}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf1") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with customer_total_return as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf10") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf11") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with year_total as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf12") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf13") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select avg(ss_quantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf14") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with cross_items as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf15") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select ca_zip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf16") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf17") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf18") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf19") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf2") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with wscs as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf20") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf21") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf22") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_product_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf23") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with frequent_ss_items as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf24") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with ssales as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf25") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf26") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf27") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf28") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf29") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf3") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select dt.d_year
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf30") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with customer_total_return as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf31") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with ss as
Expand Down
Loading

0 comments on commit bd9b058

Please sign in to comment.