Skip to content

Commit

Permalink
enable rf prune
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Sep 21, 2023
1 parent 0d20a61 commit 09a1271
Show file tree
Hide file tree
Showing 129 changed files with 299 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ public PhysicalProject visitPhysicalProject(PhysicalProject<? extends Plan> proj
Plan newChild = child.accept(this, ctx);
if (newChild instanceof PhysicalProject) {
List<NamedExpression> projections = project.mergeProjections((PhysicalProject) newChild);
return project.withProjectionsAndChild(projections, newChild.child(0));
return (PhysicalProject) project
.withProjectionsAndChild(projections, newChild.child(0))
.copyStatsAndGroupIdFrom(project);
}
return child != newChild ? project.withChildren(Lists.newArrayList(newChild)) : project;
return child != newChild
? (PhysicalProject) project.withChildren(Lists.newArrayList(newChild)).copyStatsAndGroupIdFrom(project)
: project;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
Expand All @@ -31,14 +32,20 @@ public class PushdownFilterThroughProject extends PlanPostProcessor {
public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, CascadesContext context) {
Plan child = filter.child();
if (!(child instanceof PhysicalProject)) {
return filter.withChildren(child.accept(this, context));
Plan newChild = child.accept(this, context);
if (newChild == child) {
return filter;
} else {
return ((AbstractPhysicalPlan) filter.withChildren(child.accept(this, context)))
.copyStatsAndGroupIdFrom(filter);
}
}

PhysicalProject<? extends Plan> project = (PhysicalProject<? extends Plan>) child;
PhysicalFilter<? extends Plan> newFilter = filter.withConjunctsAndChild(
ExpressionUtils.replace(filter.getConjuncts(), project.getAliasToProducer()),
project.child());

return project.withChildren(newFilter.accept(this, context));
return ((PhysicalProject) project.withChildren(newFilter.accept(this, context)))
.copyStatsAndGroupIdFrom(project);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@
package org.apache.doris.nereids.processor.post;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;

/**
* merge consecutive projects
*/
public class RecomputeLogicalPropertiesProcessor extends PlanPostProcessor {
@Override
public Plan visit(Plan plan, CascadesContext ctx) {
PhysicalPlan physicalPlan = (PhysicalPlan) visitChildren(this, plan, ctx);
physicalPlan = physicalPlan.resetLogicalProperties();
physicalPlan = physicalPlan.withPhysicalPropertiesAndStats(physicalPlan.getPhysicalProperties(),
((AbstractPlan) plan).getStats());
physicalPlan.setMutableState(MutableState.KEY_GROUP, plan.getGroupIdAsString());
return physicalPlan;
AbstractPhysicalPlan newPlan = (AbstractPhysicalPlan) visitChildren(this, plan, ctx);
return ((AbstractPhysicalPlan) newPlan.resetLogicalProperties())
.copyStatsAndGroupIdFrom((AbstractPhysicalPlan) plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -136,4 +137,11 @@ public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<Runtim
public Plan getExplainPlan(ConnectContext ctx) {
return this;
}

public <T extends AbstractPhysicalPlan> AbstractPhysicalPlan copyStatsAndGroupIdFrom(T from) {
AbstractPhysicalPlan newPlan = (AbstractPhysicalPlan) withPhysicalPropertiesAndStats(
from.getPhysicalProperties(), from.getStats());
newPlan.setMutableState(MutableState.KEY_GROUP, from.getGroupIdAsString());
return newPlan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {
private double broadcastHashtableMemLimitPercentage = 0.2;

@VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE, needForward = true)
public boolean enableRuntimeFilterPrune = false;
public boolean enableRuntimeFilterPrune = true;

/**
* The client can pass some special information by setting this session variable in the format: "k1:v1;k2:v2".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void runBeforeAll() throws Exception {
super.runBeforeAll();
connectContext.getSessionVariable().setRuntimeFilterMode("Global");
connectContext.getSessionVariable().setRuntimeFilterType(8);
connectContext.getSessionVariable().setEnableRuntimeFilterPrune(false);
connectContext.getSessionVariable().expandRuntimeFilterByInnerJoin = false;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ suite("ds_rf{--}") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
String stmt = '''
explain physical plan
{query}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf1") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with customer_total_return as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf10") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf11") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with year_total as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf12") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf13") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select avg(ss_quantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf14") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with cross_items as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf15") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select ca_zip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf16") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf17") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf18") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf19") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf2") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with wscs as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf20") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf21") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf22") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_product_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf23") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with frequent_ss_items as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf24") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with ssales as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf25") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf26") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf27") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf28") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf29") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf3") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select dt.d_year
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf30") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with customer_total_return as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf31") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with ss as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf32") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select sum(cs_ext_discount_amt) as "excess discount amount"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf33") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
with ss as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf34") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select c_last_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf35") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf36") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ suite("ds_rf37") {
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'set enable_pipeline_engine=true'
sql 'set enable_runtime_filter_prune=false'
sql 'set expand_runtime_filter_by_inner_join=false'
String stmt = '''
explain physical plan
select i_item_id
Expand Down
Loading

0 comments on commit 09a1271

Please sign in to comment.