Skip to content

Commit

Permalink
2 phase agg on union
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Nov 1, 2023
1 parent b3f31f9 commit 1aa0da6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
// the penalty factor is no more than BROADCAST_JOIN_SKEW_PENALTY_LIMIT
static final double BROADCAST_JOIN_SKEW_RATIO = 30.0;
static final double BROADCAST_JOIN_SKEW_PENALTY_LIMIT = 2.0;
static final double RANDOM_SHUFFLE_TO_HASH_SHUFFLE_FACTOR = 0.1;
private final int beNumber;

public CostModelV1() {
Expand Down Expand Up @@ -226,10 +227,11 @@ public Cost visitPhysicalDistribute(
}

// any
// cost of randome shuffle is lower than hash shuffle.
return CostV1.of(
intputRowCount,
0,
0);
0,
intputRowCount * childStatistics.dataSizeFactor() * RANDOM_SHUFFLE_TO_HASH_SHUFFLE_FACTOR / beNumber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;

Expand Down Expand Up @@ -115,6 +116,16 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan>
&& children.get(0).getPlan() instanceof PhysicalDistribute) {
return false;
}

// agg(group by x)-union all(A, B)
// no matter x.ndv is high or not, it is not worthwhile to shuffle A and B by x
// and hence we forbid one phase agg
if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
&& requiredProperties.get(0).getDistributionSpec() instanceof DistributionSpecHash
&& children.get(0).getPlan() instanceof PhysicalUnion
&& !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
return false;
}
// forbid multi distinct opt that bad than multi-stage version when multi-stage can be executed in one fragment
if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == AggMode.INPUT_TO_RESULT) {
List<MultiDistinction> multiDistinctions = agg.getOutputExpressions().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,8 @@ public List<Slot> computeOutput() {
.map(NamedExpression::toSlot)
.collect(ImmutableList.toImmutableList());
}

public boolean isDistinct() {
return qualifier == Qualifier.DISTINCT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class ColumnStatistic {

public static final double STATS_ERROR = 0.1D;

public static final double ALMOST_UNIQUE_FACTOR = 0.9;
public static final StatsType NDV = StatsType.NDV;
public static final StatsType AVG_SIZE = StatsType.AVG_SIZE;
public static final StatsType MAX_SIZE = StatsType.MAX_SIZE;
Expand Down Expand Up @@ -202,7 +202,7 @@ public static ColumnStatistic fromResultRow(ResultRow row) {
}

public static boolean isAlmostUnique(double ndv, double rowCount) {
return rowCount * 0.9 < ndv && ndv < rowCount * 1.1;
return rowCount * ALMOST_UNIQUE_FACTOR < ndv;
}

public ColumnStatistic updateByLimit(long limit, double rowCount) {
Expand Down

0 comments on commit 1aa0da6

Please sign in to comment.