diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CompressedMaterialization.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CompressedMaterialization.java new file mode 100644 index 00000000000000..e7557e9d562f57 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CompressedMaterialization.java @@ -0,0 +1,66 @@ +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Any; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue; +import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar; +import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsInt; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.types.VarcharType; +import org.apache.doris.nereids.types.coercion.CharacterType; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class CompressedMaterialization extends PlanPostProcessor{ + @Override + public PhysicalHashAggregate visitPhysicalHashAggregate(PhysicalHashAggregate aggregate, + CascadesContext context) { + List newGroupByExpressions = Lists.newArrayList(); + List encodedExpressions = Lists.newArrayList(); + for (Expression gp : aggregate.getGroupByExpressions()) { + if (gp instanceof SlotReference && canCompress(gp)) { + newGroupByExpressions.add(new Alias(new EncodeAsInt(gp), ((SlotReference) gp).getName())); + encodedExpressions.add(gp); + } else { + newGroupByExpressions.add(gp); + } + } + if (!encodedExpressions.isEmpty()) { + aggregate = aggregate.withGroupByExpressions(newGroupByExpressions); + boolean hasNewOutput = false; + List newOutput = Lists.newArrayList(); + List output = aggregate.getOutputExpressions(); + for (NamedExpression ne : output) { + if (ne instanceof SlotReference && encodedExpressions.contains(ne)) { + newOutput.add(new Alias(ne.getExprId(), new AnyValue(ne), ne.getName())); + hasNewOutput = true; + } else { + newOutput.add(ne); + } + } + if (hasNewOutput) { + aggregate = aggregate.withAggOutput(newOutput); + } + } + return aggregate; + } + + private boolean canCompress(Expression expression) { + DataType type = expression.getDataType(); + if (type instanceof CharacterType) { + CharacterType ct = (CharacterType) type; + if (ct.getLen() < 7) { + return true; + } + } + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 11a4b73d8a31c5..d94db928ab1218 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -66,6 +66,7 @@ public List getProcessors() { if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) { builder.add(new ProjectAggregateExpressionsForCse()); } + builder.add(new CompressedMaterialization()); builder.add(new CommonSubExpressionOpt()); // DO NOT replace PLAN NODE from here if (cascadesContext.getConnectContext().getSessionVariable().pushTopnToAgg) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java index 3212cb170f052c..0c53de5e1b4fca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java @@ -234,6 +234,13 @@ public int hashCode() { aggregateParam, maybeUsingStream, requireProperties); } + public PhysicalHashAggregate withGroupByExpressions(List newGroupByExpressions) { + return new PhysicalHashAggregate<>(newGroupByExpressions, outputExpressions, partitionExpressions, + aggregateParam, maybeUsingStream, groupExpression, getLogicalProperties(), + requireProperties, physicalProperties, statistics, + child()); + } + @Override public PhysicalHashAggregate withChildren(List children) { Preconditions.checkArgument(children.size() == 1);