Skip to content

Commit

Permalink
agg on union all
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Nov 9, 2023
1 parent 0d83327 commit d77250b
Show file tree
Hide file tree
Showing 11 changed files with 437 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
// the penalty factor is no more than BROADCAST_JOIN_SKEW_PENALTY_LIMIT
static final double BROADCAST_JOIN_SKEW_RATIO = 30.0;
static final double BROADCAST_JOIN_SKEW_PENALTY_LIMIT = 2.0;
private int beNumber = 1;
static final double RANDOM_SHUFFLE_TO_HASH_SHUFFLE_FACTOR = 0.1;
private final int beNumber;

public CostModelV1() {
if (ConnectContext.get().getSessionVariable().isPlayNereidsDump()) {
Expand Down Expand Up @@ -236,9 +237,9 @@ public Cost visitPhysicalDistribute(

// any
return CostV1.of(
intputRowCount,
0,
0);
0,
intputRowCount * childStatistics.dataSizeFactor() * RANDOM_SHUFFLE_TO_HASH_SHUFFLE_FACTOR / beNumber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;

Expand Down Expand Up @@ -114,6 +115,15 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan>
&& children.get(0).getPlan() instanceof PhysicalDistribute) {
return false;
}

// agg(group by x)-union all(A, B)
// no matter x.ndv is high or not, it is not worthwhile to shuffle A and B by x
// and hence we forbid one phase agg
if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
&& children.get(0).getPlan() instanceof PhysicalUnion
&& !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
return false;
}
// forbid multi distinct opt that bad than multi-stage version when multi-stage can be executed in one fragment
if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == AggMode.INPUT_TO_RESULT) {
List<MultiDistinction> multiDistinctions = agg.getOutputExpressions().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,8 @@ public List<Slot> computeOutput() {
.map(NamedExpression::toSlot)
.collect(ImmutableList.toImmutableList());
}

public boolean isDistinct() {
return qualifier == Qualifier.DISTINCT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class ColumnStatistic {

public static final double STATS_ERROR = 0.1D;

public static final double ALMOST_UNIQUE_FACTOR = 0.9;
public static final StatsType NDV = StatsType.NDV;
public static final StatsType AVG_SIZE = StatsType.AVG_SIZE;
public static final StatsType MAX_SIZE = StatsType.MAX_SIZE;
Expand Down Expand Up @@ -211,7 +211,7 @@ public static ColumnStatistic fromResultRow(ResultRow row) {
}

public static boolean isAlmostUnique(double ndv, double rowCount) {
return rowCount * 0.9 < ndv && ndv < rowCount * 1.1;
return rowCount * ALMOST_UNIQUE_FACTOR < ndv;
}

public ColumnStatistic updateByLimit(long limit, double rowCount) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_1 --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----PhysicalProject
------hashAgg[GLOBAL]
--------PhysicalDistribute
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN] hashCondition=((store_returns.sr_returned_date_sk = date_dim.d_date_sk))otherCondition=()
----------------PhysicalProject
------------------PhysicalOlapScan[store_returns]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------filter((date_dim.d_year = 2000))
----------------------PhysicalOlapScan[date_dim]
--PhysicalResultSink
----PhysicalTopN
------PhysicalDistribute
--------PhysicalTopN
----------PhysicalProject
------------hashJoin[INNER_JOIN] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk))otherCondition=()
--------------PhysicalDistribute
----------------PhysicalProject
------------------PhysicalOlapScan[customer]
--------------PhysicalDistribute
----------------hashJoin[INNER_JOIN] hashCondition=((ctr1.ctr_store_sk = ctr2.ctr_store_sk))otherCondition=((cast(ctr_total_return as DOUBLE) > cast((avg(cast(ctr_total_return as DECIMALV3(38, 4))) * 1.2) as DOUBLE)))
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((store.s_store_sk = ctr1.ctr_store_sk))otherCondition=()
----------------------PhysicalDistribute
------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------filter((store.s_state = 'TN'))
----------------------------PhysicalOlapScan[store]
------------------PhysicalDistribute
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute
------------------------hashAgg[LOCAL]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_49 --
PhysicalResultSink
--PhysicalTopN
----PhysicalDistribute
------PhysicalTopN
--------hashAgg[GLOBAL]
----------PhysicalDistribute
------------hashAgg[LOCAL]
--------------PhysicalUnion
----------------PhysicalDistribute
------------------PhysicalProject
--------------------filter(((return_rank <= 10) OR (currency_rank <= 10)))
----------------------PhysicalWindow
------------------------PhysicalQuickSort
--------------------------PhysicalWindow
----------------------------PhysicalQuickSort
------------------------------PhysicalDistribute
--------------------------------PhysicalQuickSort
----------------------------------PhysicalProject
------------------------------------hashAgg[GLOBAL]
--------------------------------------PhysicalDistribute
----------------------------------------hashAgg[LOCAL]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((ws.ws_item_sk = wr.wr_item_sk) and (ws.ws_order_number = wr.wr_order_number))otherCondition=()
----------------------------------------------PhysicalProject
------------------------------------------------filter((wr.wr_return_amt > 10000.00))
--------------------------------------------------PhysicalOlapScan[web_returns]
----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((ws.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=()
------------------------------------------------PhysicalProject
--------------------------------------------------filter((ws.ws_net_paid > 0.00) and (ws.ws_net_profit > 1.00) and (ws.ws_quantity > 0))
----------------------------------------------------PhysicalOlapScan[web_sales]
------------------------------------------------PhysicalDistribute
--------------------------------------------------PhysicalProject
----------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1998))
------------------------------------------------------PhysicalOlapScan[date_dim]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------filter(((return_rank <= 10) OR (currency_rank <= 10)))
----------------------PhysicalWindow
------------------------PhysicalQuickSort
--------------------------PhysicalWindow
----------------------------PhysicalQuickSort
------------------------------PhysicalDistribute
--------------------------------PhysicalQuickSort
----------------------------------PhysicalProject
------------------------------------hashAgg[GLOBAL]
--------------------------------------PhysicalDistribute
----------------------------------------hashAgg[LOCAL]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((cs.cs_item_sk = cr.cr_item_sk) and (cs.cs_order_number = cr.cr_order_number))otherCondition=()
----------------------------------------------PhysicalProject
------------------------------------------------filter((cr.cr_return_amount > 10000.00))
--------------------------------------------------PhysicalOlapScan[catalog_returns]
----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((cs.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=()
------------------------------------------------PhysicalProject
--------------------------------------------------filter((cs.cs_net_paid > 0.00) and (cs.cs_net_profit > 1.00) and (cs.cs_quantity > 0))
----------------------------------------------------PhysicalOlapScan[catalog_sales]
------------------------------------------------PhysicalDistribute
--------------------------------------------------PhysicalProject
----------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1998))
------------------------------------------------------PhysicalOlapScan[date_dim]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------filter(((return_rank <= 10) OR (currency_rank <= 10)))
----------------------PhysicalWindow
------------------------PhysicalQuickSort
--------------------------PhysicalWindow
----------------------------PhysicalQuickSort
------------------------------PhysicalDistribute
--------------------------------PhysicalQuickSort
----------------------------------PhysicalProject
------------------------------------hashAgg[GLOBAL]
--------------------------------------PhysicalDistribute
----------------------------------------hashAgg[LOCAL]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((sts.ss_item_sk = sr.sr_item_sk) and (sts.ss_ticket_number = sr.sr_ticket_number))otherCondition=()
----------------------------------------------PhysicalProject
------------------------------------------------filter((sr.sr_return_amt > 10000.00))
--------------------------------------------------PhysicalOlapScan[store_returns]
----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((sts.ss_sold_date_sk = date_dim.d_date_sk))otherCondition=()
------------------------------------------------PhysicalProject
--------------------------------------------------filter((sts.ss_net_paid > 0.00) and (sts.ss_net_profit > 1.00) and (sts.ss_quantity > 0))
----------------------------------------------------PhysicalOlapScan[store_sales]
------------------------------------------------PhysicalDistribute
--------------------------------------------------PhysicalProject
----------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1998))
------------------------------------------------------PhysicalOlapScan[date_dim]

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_75 --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----hashAgg[GLOBAL]
------PhysicalDistribute
--------hashAgg[LOCAL]
----------hashAgg[GLOBAL]
------------PhysicalDistribute
--------------hashAgg[LOCAL]
----------------PhysicalUnion
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number))otherCondition=()
------------------------PhysicalProject
--------------------------PhysicalOlapScan[catalog_returns]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = catalog_sales.cs_sold_date_sk))otherCondition=()
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((item.i_item_sk = catalog_sales.cs_item_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------filter((item.i_category = 'Sports'))
--------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter(d_year IN (2001, 2002))
----------------------------------PhysicalOlapScan[date_dim]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number))otherCondition=()
------------------------PhysicalProject
--------------------------PhysicalOlapScan[store_returns]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = store_sales.ss_sold_date_sk))otherCondition=()
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((item.i_item_sk = store_sales.ss_item_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[store_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------filter((item.i_category = 'Sports'))
--------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter(d_year IN (2001, 2002))
----------------------------------PhysicalOlapScan[date_dim]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((web_sales.ws_item_sk = web_returns.wr_item_sk) and (web_sales.ws_order_number = web_returns.wr_order_number))otherCondition=()
------------------------PhysicalProject
--------------------------PhysicalOlapScan[web_returns]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = web_sales.ws_sold_date_sk))otherCondition=()
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((item.i_item_sk = web_sales.ws_item_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------filter((item.i_category = 'Sports'))
--------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter(d_year IN (2001, 2002))
----------------------------------PhysicalOlapScan[date_dim]
--PhysicalResultSink
----PhysicalTopN
------PhysicalDistribute
--------PhysicalTopN
----------PhysicalProject
------------hashJoin[INNER_JOIN] hashCondition=((curr_yr.i_brand_id = prev_yr.i_brand_id) and (curr_yr.i_category_id = prev_yr.i_category_id) and (curr_yr.i_class_id = prev_yr.i_class_id) and (curr_yr.i_manufact_id = prev_yr.i_manufact_id))otherCondition=(((cast(cast(sales_cnt as DECIMALV3(17, 2)) as DECIMALV3(23, 8)) / cast(sales_cnt as DECIMALV3(17, 2))) < 0.900000))
--------------PhysicalDistribute
----------------filter((curr_yr.d_year = 2002))
------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------PhysicalDistribute
----------------filter((prev_yr.d_year = 2001))
------------------PhysicalCteConsumer ( cteId=CTEId#0 )

Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------------PhysicalProject
--------------------------filter((store.s_state = 'SD'))
----------------------------PhysicalOlapScan[store]
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute
----------------------hashAgg[LOCAL]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
------------------PhysicalDistribute
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute
------------------------hashAgg[LOCAL]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

Loading

0 comments on commit d77250b

Please sign in to comment.