Skip to content

Commit

Permalink
Revert "[opt](nereids) adjust broadcast/shuffle join (apache#37823)"
Browse files Browse the repository at this point in the history
This reverts commit aaab2e5.
  • Loading branch information
englefly committed Aug 8, 2024
1 parent b7fa6c1 commit ad93d77
Show file tree
Hide file tree
Showing 1,285 changed files with 3,696 additions and 5,617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,10 @@ public Cost visitPhysicalDistribute(
// shuffle
if (spec instanceof DistributionSpecHash) {
return CostV1.of(context.getSessionVariable(),
intputRowCount / beNumber,
0,
0,
intputRowCount * childStatistics.dataSizeFactor(
distribute.child().getOutput()) / beNumber
);
distribute.child().getOutput()) / beNumber);
}

// replicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ public double computeSize(List<Slot> slots) {
}

public double dataSizeFactor(List<Slot> slots) {
return 0.05 * computeTupleSize(slots);
double lowerBound = 0.03;
double upperBound = 0.07;
return Math.min(Math.max(computeTupleSize(slots) / K_BYTES, lowerBound), upperBound);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,47 @@

package org.apache.doris.nereids.memo;

// import org.apache.doris.nereids.CascadesContext;
// import org.apache.doris.nereids.trees.plans.JoinType;
// import org.apache.doris.nereids.trees.plans.Plan;
// import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
// import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
// import org.apache.doris.nereids.util.HyperGraphBuilder;
// import org.apache.doris.nereids.util.MemoTestUtils;
// import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.util.HyperGraphBuilder;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
//
// import com.google.common.collect.Sets;
// import org.junit.jupiter.api.Assertions;
// import org.junit.jupiter.api.Test;
//
// import java.util.HashSet;
// import java.util.Set;

import com.google.common.collect.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.Set;

class RankTest extends TestWithFeService {

// @Test
// void test() throws Exception {
// createDatabase("test");
// HyperGraphBuilder hyperGraphBuilder = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN));
// hyperGraphBuilder.init(0, 1, 2);
// Plan plan = hyperGraphBuilder
// .addEdge(JoinType.INNER_JOIN, 0, 1)
// .addEdge(JoinType.INNER_JOIN, 1, 2)
// .buildPlan();
// plan = new LogicalProject(plan.getOutput(), plan);
// CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(connectContext, plan);
// hyperGraphBuilder.initStats("test", cascadesContext);
// PhysicalPlan bestPlan = PlanChecker.from(cascadesContext)
// .optimize()
// .getBestPlanTree();
// Memo memo = cascadesContext.getMemo();
// Set<String> shape = new HashSet<>();
// for (int i = 0; i < memo.getRankSize(); i++) {
// shape.add(memo.unrank(memo.rank(i + 1).first).shape(""));
// }
// System.out.println(shape);
// Assertions.assertEquals(2, shape.size());
// Assertions.assertEquals(bestPlan.shape(""), memo.unrank(memo.rank(1).first).shape(""));
// }
@Test
void test() throws Exception {
createDatabase("test");
HyperGraphBuilder hyperGraphBuilder = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN));
hyperGraphBuilder.init(0, 1, 2);
Plan plan = hyperGraphBuilder
.addEdge(JoinType.INNER_JOIN, 0, 1)
.addEdge(JoinType.INNER_JOIN, 1, 2)
.buildPlan();
plan = new LogicalProject(plan.getOutput(), plan);
CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(connectContext, plan);
hyperGraphBuilder.initStats("test", cascadesContext);
PhysicalPlan bestPlan = PlanChecker.from(cascadesContext)
.optimize()
.getBestPlanTree();
Memo memo = cascadesContext.getMemo();
Set<String> shape = new HashSet<>();
for (int i = 0; i < memo.getRankSize(); i++) {
shape.add(memo.unrank(memo.rank(i + 1).first).shape(""));
}
System.out.println(shape);
Assertions.assertEquals(2, shape.size());
Assertions.assertEquals(bestPlan.shape(""), memo.unrank(memo.rank(1).first).shape(""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------PhysicalDistribute[DistributionSpecGather]
--------PhysicalTopN[LOCAL_SORT]
----------PhysicalProject
------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk)) otherCondition=()
------------hashJoin[INNER_JOIN shuffle] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk)) otherCondition=()
--------------PhysicalProject
----------------PhysicalOlapScan[customer]
--------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ PhysicalResultSink
------------------PhysicalDistribute[DistributionSpecHash]
--------------------hashAgg[LOCAL]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF1 i_item_sk->[ws_item_sk]
------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF1 i_item_sk->[ws_item_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
------------------------------PhysicalProject
Expand Down
4 changes: 2 additions & 2 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query13.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------PhysicalProject
----------hashJoin[INNER_JOIN broadcast] hashCondition=((store.s_store_sk = store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 s_store_sk->[ss_store_sk]
----------hashJoin[INNER_JOIN shuffle] hashCondition=((store.s_store_sk = store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 s_store_sk->[ss_store_sk]
------------PhysicalProject
--------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer_demographics.cd_demo_sk = store_sales.ss_cdemo_sk)) otherCondition=((((household_demographics.hd_dep_count = 1) AND ((((customer_demographics.cd_marital_status = 'D') AND (customer_demographics.cd_education_status = 'Primary')) AND ((store_sales.ss_sales_price >= 50.00) AND (store_sales.ss_sales_price <= 100.00))) OR (((customer_demographics.cd_marital_status = 'W') AND (customer_demographics.cd_education_status = '2 yr Degree')) AND ((store_sales.ss_sales_price >= 150.00) AND (store_sales.ss_sales_price <= 200.00))))) OR ((((customer_demographics.cd_marital_status = 'M') AND (customer_demographics.cd_education_status = 'College')) AND ((store_sales.ss_sales_price >= 100.00) AND (store_sales.ss_sales_price <= 150.00))) AND (household_demographics.hd_dep_count = 3)))) build RFs:RF3 ss_cdemo_sk->[cd_demo_sk]
----------------PhysicalProject
Expand All @@ -16,7 +16,7 @@ PhysicalResultSink
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ss_sold_date_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=((((ca_state IN ('IL', 'TN', 'TX') AND ((store_sales.ss_net_profit >= 100.00) AND (store_sales.ss_net_profit <= 200.00))) OR (ca_state IN ('ID', 'OH', 'WY') AND ((store_sales.ss_net_profit >= 150.00) AND (store_sales.ss_net_profit <= 300.00)))) OR (ca_state IN ('IA', 'MS', 'SC') AND ((store_sales.ss_net_profit >= 50.00) AND (store_sales.ss_net_profit <= 250.00))))) build RFs:RF0 ca_address_sk->[ss_addr_sk]
--------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=((((ca_state IN ('IL', 'TN', 'TX') AND ((store_sales.ss_net_profit >= 100.00) AND (store_sales.ss_net_profit <= 200.00))) OR (ca_state IN ('ID', 'OH', 'WY') AND ((store_sales.ss_net_profit >= 150.00) AND (store_sales.ss_net_profit <= 300.00)))) OR (ca_state IN ('IA', 'MS', 'SC') AND ((store_sales.ss_net_profit >= 50.00) AND (store_sales.ss_net_profit <= 250.00))))) build RFs:RF0 ca_address_sk->[ss_addr_sk]
----------------------------PhysicalProject
------------------------------filter((store_sales.ss_net_profit <= 300.00) and (store_sales.ss_net_profit >= 50.00) and (store_sales.ss_sales_price <= 200.00) and (store_sales.ss_sales_price >= 50.00))
--------------------------------PhysicalOlapScan[store_sales] apply RFs: RF0 RF1 RF2 RF4
Expand Down
12 changes: 6 additions & 6 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query14.out
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF13 i_item_sk->[ss_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF13 i_item_sk->[ss_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN shuffle] hashCondition=((store_sales.ss_item_sk = cross_items.ss_item_sk)) otherCondition=()
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF12 d_date_sk->[ss_sold_date_sk]
----------------------------------------------PhysicalProject
Expand All @@ -119,8 +119,8 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF15 i_item_sk->[cs_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF15 i_item_sk->[cs_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN shuffle] hashCondition=((catalog_sales.cs_item_sk = cross_items.ss_item_sk)) otherCondition=()
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF14 d_date_sk->[cs_sold_date_sk]
----------------------------------------------PhysicalProject
Expand All @@ -142,8 +142,8 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF17 i_item_sk->[ss_item_sk,ws_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF17 i_item_sk->[ss_item_sk,ws_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=()
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF16 d_date_sk->[ws_sold_date_sk]
----------------------------------------------PhysicalProject
Expand Down
38 changes: 19 additions & 19 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query16.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ PhysicalResultSink
----------hashAgg[GLOBAL]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF4 cs_order_number->[cs_order_number]
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF3 cs_order_number->[cs_order_number]
------------------PhysicalProject
--------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4
------------------hashJoin[RIGHT_ANTI_JOIN shuffle] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=() build RFs:RF3 cs_order_number->[cr_order_number]
--------------------PhysicalProject
----------------------PhysicalOlapScan[catalog_returns] apply RFs: RF3
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF2 cc_call_center_sk->[cs_call_center_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
--------------------PhysicalOlapScan[catalog_sales] apply RFs: RF3
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF2 cc_call_center_sk->[cs_call_center_sk]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
------------------------------hashJoin[LEFT_ANTI_JOIN shuffle] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF0 RF1 RF2
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_state = 'PA'))
------------------------------------PhysicalOlapScan[customer_address]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
--------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalProject
--------------------------filter((call_center.cc_county = 'Williamson County'))
----------------------------PhysicalOlapScan[call_center]
----------------------------------PhysicalOlapScan[catalog_returns]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_state = 'PA'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalProject
----------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalProject
------------------------filter((call_center.cc_county = 'Williamson County'))
--------------------------PhysicalOlapScan[call_center]

Hint log:
Used: leading(catalog_sales { cs1 customer_address date_dim call_center } )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ PhysicalResultSink
--------------------------filter(d_quarter_name IN ('2001Q1', '2001Q2', '2001Q3'))
----------------------------PhysicalOlapScan[date_dim]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((item.i_item_sk = store_sales.ss_item_sk)) otherCondition=() build RFs:RF6 i_item_sk->[sr_item_sk,ss_item_sk]
----------------------hashJoin[INNER_JOIN shuffle] hashCondition=((item.i_item_sk = store_sales.ss_item_sk)) otherCondition=() build RFs:RF6 i_item_sk->[sr_item_sk,ss_item_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store.s_store_sk = store_sales.ss_store_sk)) otherCondition=() build RFs:RF5 s_store_sk->[ss_store_sk]
----------------------------PhysicalProject
Expand Down
6 changes: 3 additions & 3 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query18.out
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ PhysicalResultSink
--------------hashAgg[LOCAL]
----------------PhysicalRepeat
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF5 i_item_sk->[cs_item_sk]
--------------------hashJoin[INNER_JOIN shuffle] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF5 i_item_sk->[cs_item_sk]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF4 d_date_sk->[cs_sold_date_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
----------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_bill_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF2 cd_demo_sk->[cs_bill_cdemo_sk]
----------------------------------PhysicalProject
Expand All @@ -27,7 +27,7 @@ PhysicalResultSink
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF1
----------------------------------PhysicalProject
------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[c_current_addr_sk]
------------------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[c_current_addr_sk]
--------------------------------------PhysicalProject
----------------------------------------filter(c_birth_month IN (1, 10, 11, 3, 4, 7))
------------------------------------------PhysicalOlapScan[customer] apply RFs: RF0
Expand Down
Loading

0 comments on commit ad93d77

Please sign in to comment.