Skip to content

Commit

Permalink
Enable push partial aggregation though join
Browse files Browse the repository at this point in the history
Make push partial aggregation CBO based.
Enable it for cases where pushed aggregation has same grouping keys.
Additionally, for queries like

select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year

partial aggregation on date_dim.year can be pushed
below join with grouping key of "date_id", which
can greatly reduce number of rows before join operator.
  • Loading branch information
sopel39 authored and raunaqmorarka committed Nov 21, 2024
1 parent dd1711c commit ef267fc
Show file tree
Hide file tree
Showing 59 changed files with 1,770 additions and 1,138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class OptimizerConfig
private boolean preferPartialAggregation = true;
private boolean pushAggregationThroughOuterJoin = true;
private boolean enableIntermediateAggregations;
private boolean pushPartialAggregationThroughJoin;
private boolean pushPartialAggregationThroughJoin = true;
private boolean preAggregateCaseAggregationsEnabled = true;
private boolean enableForcedExchangeBelowGroupId = true;
private boolean optimizeTopNRanking = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,11 +1000,13 @@ public PlanOptimizers(
ruleStats,
statsCalculator,
costCalculator,
ImmutableSet.of(
new PushPartialAggregationThroughJoin(),
new PushPartialAggregationThroughExchange(plannerContext),
new PruneJoinColumns(),
new PruneJoinChildrenColumns())));
ImmutableSet.<Rule<?>>builder()
.addAll(new PushPartialAggregationThroughJoin().rules())
.add(new PushPartialAggregationThroughExchange(plannerContext),
new PruneJoinColumns(),
new PruneJoinChildrenColumns(),
new RemoveRedundantIdentityProjections())
.build()));
builder.add(new IterativeOptimizer(
plannerContext,
ruleStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange,

SymbolMapper symbolMapper = mappingsBuilder.build();
AggregationNode mappedPartial = symbolMapper.map(aggregation, source, context.getIdAllocator().getNextId());
mappedPartial = AggregationNode.builderFrom(mappedPartial)
.setIsInputReducingAggregation(true)
.build();

Assignments.Builder assignments = Assignments.builder();

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testDefaults()
.setEnableForcedExchangeBelowGroupId(true)
.setEnableIntermediateAggregations(false)
.setPushAggregationThroughOuterJoin(true)
.setPushPartialAggregationThroughJoin(false)
.setPushPartialAggregationThroughJoin(true)
.setPreAggregateCaseAggregationsEnabled(true)
.setDistinctAggregationsStrategy(null)
.setPreferPartialAggregation(true)
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-table-write-through-union", "false")
.put("optimizer.dictionary-aggregation", "true")
.put("optimizer.push-aggregation-through-outer-join", "false")
.put("optimizer.push-partial-aggregation-through-join", "true")
.put("optimizer.push-partial-aggregation-through-join", "false")
.put("optimizer.pre-aggregate-case-aggregations.enabled", "false")
.put("optimizer.enable-intermediate-aggregations", "true")
.put("optimizer.force-single-node-output", "true")
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testExplicitPropertyMappings()
.setPushTableWriteThroughUnion(false)
.setDictionaryAggregation(true)
.setPushAggregationThroughOuterJoin(false)
.setPushPartialAggregationThroughJoin(true)
.setPushPartialAggregationThroughJoin(false)
.setPreAggregateCaseAggregationsEnabled(false)
.setEnableIntermediateAggregations(true)
.setDistinctAggregationsStrategy(MARK_DISTINCT)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ public class AggregationBuilder
private Optional<Symbol> hashSymbol = Optional.empty();
private Optional<Symbol> groupIdSymbol = Optional.empty();
private Optional<PlanNodeId> nodeId = Optional.empty();
private Optional<Boolean> exchangeInputAggregation = Optional.empty();

public AggregationBuilder source(PlanNode source)
{
Expand Down Expand Up @@ -501,6 +502,12 @@ public AggregationBuilder nodeId(PlanNodeId nodeId)
return this;
}

public AggregationBuilder exchangeInputAggregation(boolean exchangeInputAggregation)
{
this.exchangeInputAggregation = Optional.of(exchangeInputAggregation);
return this;
}

protected AggregationNode build()
{
checkState(groupingSets != null, "No grouping sets defined; use globalGrouping/groupingKeys method");
Expand All @@ -512,7 +519,8 @@ protected AggregationNode build()
preGroupedSymbols,
step,
hashSymbol,
groupIdSymbol);
groupIdSymbol,
exchangeInputAggregation);
}
}

Expand Down Expand Up @@ -1085,9 +1093,37 @@ public JoinNode join(
Optional<Symbol> rightHashSymbol,
Optional<JoinNode.DistributionType> distributionType,
Map<DynamicFilterId, Symbol> dynamicFilters)
{
return join(idAllocator.getNextId(),
type,
left,
right,
criteria,
leftOutputSymbols,
rightOutputSymbols,
filter,
leftHashSymbol,
rightHashSymbol,
distributionType,
dynamicFilters);
}

public JoinNode join(
PlanNodeId id,
JoinType type,
PlanNode left,
PlanNode right,
List<JoinNode.EquiJoinClause> criteria,
List<Symbol> leftOutputSymbols,
List<Symbol> rightOutputSymbols,
Optional<Expression> filter,
Optional<Symbol> leftHashSymbol,
Optional<Symbol> rightHashSymbol,
Optional<JoinNode.DistributionType> distributionType,
Map<DynamicFilterId, Symbol> dynamicFilters)
{
return new JoinNode(
idAllocator.getNextId(),
id,
type,
left,
right,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ remote exchange (GATHER, SINGLE, [])
final aggregation over (d_day_name, d_week_seq)
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq])
partial aggregation over (d_day_name, d_week_seq)
intermediate aggregation over (d_day_name, d_week_seq)
join (INNER, REPLICATED):
local exchange (REPARTITION, ROUND_ROBIN, [])
dynamic filter (ws_sold_date_sk::EQUAL)
scan web_sales
dynamic filter (cs_sold_date_sk::EQUAL)
scan catalog_sales
partial aggregation over (ws_sold_date_sk)
dynamic filter (ws_sold_date_sk::EQUAL)
scan web_sales
partial aggregation over (cs_sold_date_sk)
dynamic filter (cs_sold_date_sk::EQUAL)
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
dynamic filter (d_week_seq::EQUAL, d_week_seq::EQUAL)
Expand All @@ -29,13 +31,15 @@ remote exchange (GATHER, SINGLE, [])
final aggregation over (d_day_name_134, d_week_seq_124)
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_124])
partial aggregation over (d_day_name_134, d_week_seq_124)
intermediate aggregation over (d_day_name_134, d_week_seq_124)
join (INNER, REPLICATED):
local exchange (REPARTITION, ROUND_ROBIN, [])
dynamic filter (ws_sold_date_sk_81::EQUAL)
scan web_sales
dynamic filter (cs_sold_date_sk_117::EQUAL)
scan catalog_sales
partial aggregation over (ws_sold_date_sk_81)
dynamic filter (ws_sold_date_sk_81::EQUAL)
scan web_sales
partial aggregation over (cs_sold_date_sk_117)
dynamic filter (cs_sold_date_sk_117::EQUAL)
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
dynamic filter (d_week_seq_124::EQUAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ local exchange (GATHER, SINGLE, [])
final aggregation over (d_year, i_brand, i_brand_id)
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_year, i_brand, i_brand_id])
partial aggregation over (d_year, i_brand, i_brand_id)
intermediate aggregation over (d_year, i_brand, i_brand_id)
join (INNER, REPLICATED):
join (INNER, REPLICATED):
dynamic filter (ss_item_sk::EQUAL, ss_sold_date_sk::EQUAL)
scan store_sales
partial aggregation over (ss_item_sk, ss_sold_date_sk)
dynamic filter (ss_item_sk::EQUAL, ss_sold_date_sk::EQUAL)
scan store_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
Expand Down
Loading

0 comments on commit ef267fc

Please sign in to comment.