Skip to content

Commit

Permalink
rf prune tpch part
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Sep 12, 2023
1 parent d3f1388 commit 3fe6280
Show file tree
Hide file tree
Showing 23 changed files with 1,758 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* runtime filter context used at post process and translation.
*/
public class RuntimeFilterContext {
public List<RuntimeFilter> prunedRF = Lists.newArrayList();

private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();

Expand Down Expand Up @@ -153,8 +154,11 @@ public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
if (filters != null) {
Iterator<RuntimeFilter> iter = filters.iterator();
while (iter.hasNext()) {
if (iter.next().getBuilderNode().equals(builderNode)) {
RuntimeFilter rf = iter.next();
if (rf.getBuilderNode().equals(builderNode)) {
builderNode.getRuntimeFilters().remove(rf);
iter.remove();
prunedRF.add(rf);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
Expand All @@ -54,17 +53,6 @@
*/
public class RuntimeFilterPruner extends PlanPostProcessor {

// *******************************
// Physical plans
// *******************************
@Override
public PhysicalHashAggregate visitPhysicalHashAggregate(
PhysicalHashAggregate<? extends Plan> agg, CascadesContext context) {
agg.child().accept(this, context);
context.getRuntimeFilterContext().addEffectiveSrcNode(agg);
return agg;
}

@Override
public PhysicalQuickSort visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, CascadesContext context) {
sort.child().accept(this, context);
Expand Down Expand Up @@ -165,7 +153,9 @@ public PhysicalAssertNumRows visitPhysicalAssertNumRows(PhysicalAssertNumRows<?
/**
* consider L join R on L.a=R.b
* runtime-filter: L.a<-R.b is effective,
* if R.b.selectivity<1 or b is partly covered by a
* if rf could reduce tuples of L,
* 1. some L.a distinctive value are not covered by R.b, or
* 2. if there is a effective RF applied on R
*
* TODO: min-max
* @param equalTo join condition
Expand Down Expand Up @@ -199,7 +189,7 @@ private boolean isEffectiveRuntimeFilter(EqualTo equalTo, PhysicalHashJoin join)
if (probeColumnStat.isUnKnown || buildColumnStat.isUnKnown) {
return true;
}
return probeColumnStat.notEnclosed(buildColumnStat)
|| buildColumnStat.ndv < probeColumnStat.ndv * 0.95;
double buildNdvInProbeRange = buildColumnStat.ndvIntersection(probeColumnStat);
return probeColumnStat.ndv > buildNdvInProbeRange * (1 + ColumnStatistic.STATS_ERROR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

public class ColumnStatistic {

public static final double STATS_ERROR = 0.1D;

public static final StatsType NDV = StatsType.NDV;
public static final StatsType AVG_SIZE = StatsType.AVG_SIZE;
public static final StatsType MAX_SIZE = StatsType.MAX_SIZE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

suite("rf10") {
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
sql "set runtime_filter_mode='GLOBAL'"

sql 'set exec_mem_limit=21G'
sql 'SET enable_pipeline_engine = true'
sql 'set parallel_pipeline_task_num=8'
sql 'set be_number_for_test=3'
String query = """
explain physical plan
select
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) as revenue,
c_acctbal,
n_name,
c_address,
c_phone,
c_comment
from
customer,
orders,
lineitem,
nation
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate >= date '1993-10-01'
and o_orderdate < date '1993-10-01' + interval '3' month
and l_returnflag = 'R'
and c_nationkey = n_nationkey
group by
c_custkey,
c_name,
c_acctbal,
c_phone,
n_name,
c_address,
c_comment
order by
revenue desc
limit 20;
"""
def getRuntimeFilterCountFromPlan = { plan -> {
int count = 0
plan.eachMatch("RF\\d+\\[") {
ch -> count ++
}
return count
}}
// prune 1 RF
sql "set enable_runtime_filter_prune=false"
String plan1 = sql "${query}"
int count1 = getRuntimeFilterCountFromPlan(plan1)

sql "set enable_runtime_filter_prune=true"
String plan2 = sql "${query}"

log.info("tcph_sf1000 h10 before prune:\n" + plan1)
log.info("tcph_sf1000 h10 after prune:\n" + plan2)

int count2 = getRuntimeFilterCountFromPlan(plan2)

assertEquals(3, count1)
assertEquals(2, count2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

suite("rf11") {
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
sql "set runtime_filter_mode='GLOBAL'"
sql 'set parallel_pipeline_task_num=8'
sql 'set exec_mem_limit=21G'
sql 'SET enable_pipeline_engine = true'
sql 'set be_number_for_test=3'



def String query = """
explain physical plan
select
ps_partkey,
sum(ps_supplycost * ps_availqty) as value
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
group by
ps_partkey having
sum(ps_supplycost * ps_availqty) > (
select
sum(ps_supplycost * ps_availqty) * 0.000002
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
)
order by
value desc;
"""

def getRuntimeFilterCountFromPlan = { plan -> {
int count = 0
plan.eachMatch("RF\\d+\\[") {
ch -> count ++
}
return count
}}

sql "set enable_runtime_filter_prune=false"
String plan1 = sql "${query}"
int count1 = getRuntimeFilterCountFromPlan(plan1)
sql "set enable_runtime_filter_prune=true"
String plan2 = sql "${query}"
int count2 = getRuntimeFilterCountFromPlan(plan2)

log.info("tcph_sf1000 h11 before prune:\n" + plan1)
log.info("tcph_sf1000 h11 after prune:\n" + plan2)

assertEquals(4, count1)
assertEquals(count1, count2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

suite("rf12") {
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
sql "set runtime_filter_mode='GLOBAL'"
sql 'set parallel_pipeline_task_num=8'
sql 'set exec_mem_limit=21G'
sql 'SET enable_pipeline_engine = true'
sql 'set be_number_for_test=3'

def query = """
explain physical plan
select
l_shipmode,
sum(case
when o_orderpriority = '1-URGENT'
or o_orderpriority = '2-HIGH'
then 1
else 0
end) as high_line_count,
sum(case
when o_orderpriority <> '1-URGENT'
and o_orderpriority <> '2-HIGH'
then 1
else 0
end) as low_line_count
from
orders,
lineitem
where
o_orderkey = l_orderkey
and l_shipmode in ('MAIL', 'SHIP')
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1994-01-01'
and l_receiptdate < date '1994-01-01' + interval '1' year
group by
l_shipmode
order by
l_shipmode;
"""


def getRuntimeFilterCountFromPlan = { plan -> {
int count = 0
plan.eachMatch("RF\\d+\\[") {
ch -> count ++
}
return count
}}

sql "set enable_runtime_filter_prune=false"
String plan1 = sql "${query}"
int count1 = getRuntimeFilterCountFromPlan(plan1)
sql "set enable_runtime_filter_prune=true"
String plan2 = sql "${query}"

log.info("tcph_sf1000 h12 before prune:\n" + plan1)
log.info("tcph_sf1000 h12 after prune:\n" + plan2)

int count2 = getRuntimeFilterCountFromPlan(plan2)
assertEquals(count1, count2)
assertEquals(1, count1)


}
Loading

0 comments on commit 3fe6280

Please sign in to comment.