Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Oct 16, 2024
1 parent af7eb15 commit ec02796
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.apache.doris.nereids.processor.post;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Any;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsInt;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.coercion.CharacterType;

import com.google.common.collect.Lists;

import java.util.List;

public class CompressedMaterialization extends PlanPostProcessor{
@Override
public PhysicalHashAggregate visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> aggregate,
CascadesContext context) {
List<Expression> newGroupByExpressions = Lists.newArrayList();
List<Expression> encodedExpressions = Lists.newArrayList();
for (Expression gp : aggregate.getGroupByExpressions()) {
if (gp instanceof SlotReference && canCompress(gp)) {
newGroupByExpressions.add(new Alias(new EncodeAsInt(gp), ((SlotReference) gp).getName()));
encodedExpressions.add(gp);
} else {
newGroupByExpressions.add(gp);
}
}
if (!encodedExpressions.isEmpty()) {
aggregate = aggregate.withGroupByExpressions(newGroupByExpressions);
boolean hasNewOutput = false;
List<NamedExpression> newOutput = Lists.newArrayList();
List<NamedExpression> output = aggregate.getOutputExpressions();
for (NamedExpression ne : output) {
if (ne instanceof SlotReference && encodedExpressions.contains(ne)) {
newOutput.add(new Alias(ne.getExprId(), new AnyValue(ne), ne.getName()));
hasNewOutput = true;
} else {
newOutput.add(ne);
}
}
if (hasNewOutput) {
aggregate = aggregate.withAggOutput(newOutput);
}
}
return aggregate;
}

private boolean canCompress(Expression expression) {
DataType type = expression.getDataType();
if (type instanceof CharacterType) {
CharacterType ct = (CharacterType) type;
if (ct.getLen() < 7) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public List<PlanPostProcessor> getProcessors() {
if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) {
builder.add(new ProjectAggregateExpressionsForCse());
}
builder.add(new CompressedMaterialization());
builder.add(new CommonSubExpressionOpt());
// DO NOT replace PLAN NODE from here
if (cascadesContext.getConnectContext().getSessionVariable().pushTopnToAgg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ public int hashCode() {
aggregateParam, maybeUsingStream, requireProperties);
}

public PhysicalHashAggregate<Plan> withGroupByExpressions(List<Expression> newGroupByExpressions) {
return new PhysicalHashAggregate<>(newGroupByExpressions, outputExpressions, partitionExpressions,
aggregateParam, maybeUsingStream, groupExpression, getLogicalProperties(),
requireProperties, physicalProperties, statistics,
child());
}

@Override
public PhysicalHashAggregate<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
Expand Down

0 comments on commit ec02796

Please sign in to comment.