Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Nov 18, 2024
1 parent d58b144 commit 2775797
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public List<Rule> buildRules() {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
return !funcs.isEmpty() && funcs.stream()
.allMatch(f -> (f instanceof Min || f instanceof Max || f instanceof Sum
|| (f instanceof Count && !((Count) f).isCountStar())) && !f.isDistinct()
&& f.child(0) instanceof Slot);
|| f instanceof Count && !f.isDistinct()
&& f.child(0) instanceof Slot));
})
.thenApply(ctx -> {
Set<Integer> enableNereidsRules = ctx.cascadesContext.getConnectContext()
Expand Down Expand Up @@ -135,21 +135,15 @@ public static LogicalAggregate<Plan> pushMinMaxSumCount(LogicalAggregate<? exten
} else {
for (NamedExpression proj : projects) {
if (proj instanceof Alias && proj.toSlot().equals(slot)) {
Set<Slot> inputForAlias = proj.getInputSlots();
if (leftOutput.containsAll(inputForAlias)) {
leftGroupBy.addAll(inputForAlias);
} else if (rightOutput.containsAll(inputForAlias)) {
rightGroupBy.addAll(inputForAlias);
} else {
/*
groupBy(X)
+---> project( a + b as X)
--> join(output: T1.a, T2.b)
--> T1(a)
--> T2(b)
X can not be pushed
*/
return null;
Set<Slot> inputForAliasSet = proj.getInputSlots();
for (Slot aliasInputSlot : inputForAliasSet) {
if (leftOutput.contains(aliasInputSlot)) {
leftGroupBy.add(aliasInputSlot);
} else if (rightOutput.contains(aliasInputSlot)) {
rightGroupBy.add(aliasInputSlot);
} else {
return null;
}
}
break;
}
Expand All @@ -161,6 +155,7 @@ public static LogicalAggregate<Plan> pushMinMaxSumCount(LogicalAggregate<? exten
List<AggregateFunction> leftFuncs = new ArrayList<>();
List<AggregateFunction> rightFuncs = new ArrayList<>();
Count countStar = null;
Count rewrittenCountStar = null;
for (AggregateFunction func : agg.getAggregateFunctions()) {
if (func instanceof Count && ((Count) func).isCountStar()) {
countStar = (Count) func;
Expand All @@ -175,14 +170,14 @@ public static LogicalAggregate<Plan> pushMinMaxSumCount(LogicalAggregate<? exten
}
}
}
// determine count(*)
// rewrite count(*) to count(A), where A is slot from left/right group by key
if (countStar != null) {
if (!leftGroupBy.isEmpty()) {
countStar = (Count) countStar.withChildren(leftGroupBy.iterator().next());
leftFuncs.add(countStar);
rewrittenCountStar = (Count) countStar.withChildren(leftGroupBy.iterator().next());
leftFuncs.add(rewrittenCountStar);
} else if (!rightGroupBy.isEmpty()) {
countStar = (Count) countStar.withChildren(rightGroupBy.iterator().next());
rightFuncs.add(countStar);
rewrittenCountStar = (Count) countStar.withChildren(rightGroupBy.iterator().next());
rightFuncs.add(rewrittenCountStar);
} else {
return null;
}
Expand Down Expand Up @@ -241,7 +236,7 @@ public static LogicalAggregate<Plan> pushMinMaxSumCount(LogicalAggregate<? exten
AggregateFunction func = (AggregateFunction) ((Alias) ne).child();
if (func instanceof Count && ((Count) func).isCountStar()) {
// countStar is already rewritten as count(left_slot) or count(right_slot)
func = countStar;
func = rewrittenCountStar;
}
Slot slot = (Slot) func.child(0);
if (leftSlotToOutput.containsKey(slot)) {
Expand All @@ -268,7 +263,6 @@ public static LogicalAggregate<Plan> pushMinMaxSumCount(LogicalAggregate<? exten
Set<NamedExpression> rightDifference = new HashSet<NamedExpression>(right.getOutput());
rightDifference.removeAll(project.getProjects());
newProjections.addAll(rightDifference);

newAggChild = ((LogicalProject) agg.child()).withProjectsAndChild(newProjections, newJoin);
}
// TODO: column prune project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,14 +1041,16 @@ PhysicalResultSink
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
----------hashJoin[INNER_JOIN] hashCondition=((dwd_tracking_sensor_init_tmp_ymd.dt = dw_user_b2c_tracking_info_tmp_ymd.dt) and (dwd_tracking_sensor_init_tmp_ymd.guid = dw_user_b2c_tracking_info_tmp_ymd.guid)) otherCondition=((dwd_tracking_sensor_init_tmp_ymd.dt >= substring(first_visit_time, 1, 10)))
------------filter((dwd_tracking_sensor_init_tmp_ymd.dt = '2024-08-19') and (dwd_tracking_sensor_init_tmp_ymd.tracking_type = 'click'))
--------------PhysicalOlapScan[dwd_tracking_sensor_init_tmp_ymd]
------------hashAgg[GLOBAL]
--------------hashAgg[LOCAL]
----------------filter((dwd_tracking_sensor_init_tmp_ymd.dt = '2024-08-19') and (dwd_tracking_sensor_init_tmp_ymd.tracking_type = 'click'))
------------------PhysicalOlapScan[dwd_tracking_sensor_init_tmp_ymd]
------------filter((dw_user_b2c_tracking_info_tmp_ymd.dt = '2024-08-19'))
--------------PhysicalOlapScan[dw_user_b2c_tracking_info_tmp_ymd]

Hint log:
Used:
UnUsed: use_PUSH_DOWN_AGG_THROUGH_JOIN_ONE_SIDE
Used: use_PUSH_DOWN_AGG_THROUGH_JOIN_ONE_SIDE
UnUsed:
SyntaxError:

-- !agg_pushed --
Expand Down

0 comments on commit 2775797

Please sign in to comment.