Skip to content

Commit

Permalink
compressed materialize for group by
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Oct 24, 2024
1 parent 45a2b15 commit 583c8b4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsBigInt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsInt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsLargeInt;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* select A from T group by A
Expand Down Expand Up @@ -81,67 +80,45 @@ private Expression getEncodeExpression(Expression expression) {
[not support] select substring(k, 1,2), sum(v) from t group by k
[support] select A as B from T group by A
*/
private Map<Expression, Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> aggregate) {
Map<Expression, Expression> encodableGroupbyExpressions = Maps.newHashMap();
Set<Slot> slotShouldNotEncode = Sets.newHashSet();
for (NamedExpression ne : aggregate.getOutputExpressions()) {
if (ne instanceof Alias) {
Expression child = ((Alias) ne).child();
//support: select A as B from T group by A
if (!(child instanceof SlotReference)) {
slotShouldNotEncode.addAll(child.getInputSlots());
}
}
}
private Map<Expression, Expression> getEncodeGroupByExpressions(LogicalAggregate<Plan> aggregate) {
Map<Expression, Expression> encodeGroupbyExpressions = Maps.newHashMap();
for (Expression gb : aggregate.getGroupByExpressions()) {
Expression encodeExpr = getEncodeExpression(gb);
if (encodeExpr != null) {
boolean encodable = true;
for (Slot gbs : gb.getInputSlots()) {
if (slotShouldNotEncode.contains(gbs)) {
encodable = false;
break;
}
}
if (encodable) {
encodableGroupbyExpressions.put(gb, encodeExpr);
}
encodeGroupbyExpressions.put(gb, encodeExpr);
}
}
return encodableGroupbyExpressions;
return encodeGroupbyExpressions;
}

private LogicalAggregate<Plan> compressedMaterialize(LogicalAggregate<Plan> aggregate) {
List<Alias> encodedExpressions = Lists.newArrayList();
Map<Expression, Expression> encodableGroupByExpressions = getEncodableGroupByExpressions(aggregate);
if (!encodableGroupByExpressions.isEmpty()) {
Map<Expression, Expression> encodeGroupByExpressions = getEncodeGroupByExpressions(aggregate);
if (!encodeGroupByExpressions.isEmpty()) {
List<Expression> newGroupByExpressions = Lists.newArrayList();
for (Expression gp : aggregate.getGroupByExpressions()) {
if (encodableGroupByExpressions.containsKey(gp)) {
Alias alias = new Alias(encodableGroupByExpressions.get(gp));
newGroupByExpressions.add(alias);
encodedExpressions.add(alias);
} else {
newGroupByExpressions.add(gp);
}
newGroupByExpressions.add(encodeGroupByExpressions.getOrDefault(gp, gp));
}
List<NamedExpression> newOutput = Lists.newArrayList();
for (NamedExpression ne : aggregate.getOutputExpressions()) {
// output A => output Any_value(A)
if (ne instanceof SlotReference && encodableGroupByExpressions.containsKey(ne)) {
newOutput.add(new Alias(ne.getExprId(), new AnyValue(ne), ne.getName()));
} else if (ne instanceof Alias && encodableGroupByExpressions.containsKey(((Alias) ne).child())) {
Expression child = ((Alias) ne).child();
Preconditions.checkArgument(child instanceof SlotReference,
"encode %s failed, not a slot", child);
newOutput.add(new Alias(((SlotReference) child).getExprId(), new AnyValue(child),
"any_value(" + child + ")"));
List<NamedExpression> newOutputs = Lists.newArrayList();
Map<Expression, Expression> decodeMap = new HashMap<>();
for (Expression gp : encodeGroupByExpressions.keySet()) {
decodeMap.put(gp, new DecodeAsVarchar(encodeGroupByExpressions.get(gp)));
}
for (NamedExpression out : aggregate.getOutputExpressions()) {
Expression replaced = ExpressionUtils.replace(out, decodeMap);
if (out != replaced) {
if (out instanceof SlotReference) {
newOutputs.add(new Alias(out.getExprId(), replaced, out.getName()));
} else if (out instanceof Alias) {
newOutputs.add(((Alias) out).withChildren(replaced.children()));
} else {
// should not reach here
Preconditions.checkArgument(false, "output abnormal: " + aggregate);
}
} else {
newOutput.add(ne);
newOutputs.add(out);
}
}
newOutput.addAll(encodedExpressions);
aggregate = aggregate.withGroupByAndOutput(newGroupByExpressions, newOutput);
aggregate = aggregate.withGroupByAndOutput(newGroupByExpressions, newOutputs);
}
return aggregate;
}
Expand Down
19 changes: 4 additions & 15 deletions regression-test/data/nereids_p0/compress_materialize.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,15 @@
aaaaa
bbbbb

-- !not_support --
aaa
bbb
-- !output_contains_gpk --
aaaaa aaaaa
bbbbb bbbbb

-- !not_support --
-- !expr --
aaa
bbb

-- !encodeexpr --
12
3

-- !join --
PhysicalResultSink
--hashJoin[INNER_JOIN broadcast] hashCondition=((T.k = cmt2.k2)) otherCondition=() build RFs:RF0 k2->[k]
----PhysicalProject
------hashAgg[GLOBAL]
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------PhysicalOlapScan[compress] apply RFs: RF0
----PhysicalOlapScan[cmt2]

61 changes: 23 additions & 38 deletions regression-test/suites/nereids_p0/compress_materialize.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -47,44 +47,28 @@ suite("compress_materialize") {
insert into cmt2 values("123456", 123456);
"""

// expected explain contains partial_any_value(k)
// | 1:VAGGREGATE (update serialize)(162) |
// | | STREAMING |
// | | output: partial_any_value(k[#3])[#5] |
// | | group by: encode_as_bigint(k)[#2] |
// | | sortByGroupKey:false |
// | | cardinality=1 |
// | | distribute expr lists: k[#3]
explain{
sql ("""
select k from compress group by k;
""")
contains("any_value(partial_any_value(k)")
contains("encode_as_bigint")
}
order_qt_agg_exec "select k from compress group by k;"

// 'substring(k, 1)' is in select list, not supported
explain{
sql ("""
select k, substring(k, 1) from compress group by k;
select k, substring(k, 1), sum(v) from compress group by k;
""")
notContains("any_value")
notContains("encode_as_bigint")
contains("encode_as_bigint(k)")
}
order_qt_output_contains_gpk "select k, substring(k, 1) from compress group by k;"

order_qt_expr """ select substring(k,1,3) from compress group by substring(k,1,3);"""
explain{
sql ("""
select k, substring(k, 1) from compress group by k;
""")
notContains("any_value")
notContains("encode_as_bigint")

sql "select substring(k,1,3) from compress group by substring(k,1,3);"
contains("encode_as_int(substring(k, 1, 3))")
}

order_qt_agg_exec "select k from compress group by k;"
order_qt_not_support """ select substring(k,1,3) from compress group by substring(k,1,3);"""
order_qt_not_support """ select substring(k,1,3) from compress group by k;"""

explain {
sql("select sum(v) from compress group by substring(k, 1, 3);")
contains("group by: encode_as_int(substring(k, 1, 3))")
Expand All @@ -97,22 +81,23 @@ suite("compress_materialize") {

order_qt_encodeexpr "select sum(v) from compress group by substring(k, 1, 3);"

// TODO: RF targets on compressed_materialze column is broken
// // verify that compressed materialization do not block runtime filter generation
// sql """
// set disable_join_reorder=true;
// set runtime_filter_mode = GLOBAL;
// set runtime_filter_type=2;
// set enable_runtime_filter_prune=false;
// """

// verify that compressed materialization do not block runtime filter generation
sql """
set disable_join_reorder=true;
set runtime_filter_mode = GLOBAL;
set runtime_filter_type=2;
set enable_runtime_filter_prune=false;
"""
// qt_join """
// explain shape plan
// select *
// from (
// select k from compress group by k
// ) T join[broadcast] cmt2 on T.k = cmt2.k2;
// """

qt_join """
explain shape plan
select *
from (
select k from compress group by k
) T join cmt2 on T.k = cmt2.k2;
"""

sql """
drop table if exists compressInt;
Expand Down Expand Up @@ -172,7 +157,7 @@ suite("compress_materialize") {
"""
explain{
sql "select k from notcompress group by k"
notContains("any_value")
notContains("encode_as_")
}
}

0 comments on commit 583c8b4

Please sign in to comment.