Skip to content

Commit

Permalink
analyze-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Oct 17, 2024
1 parent 9d629f8 commit ac85b6c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 106 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public List<PlanPostProcessor> getProcessors() {
if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) {
builder.add(new ProjectAggregateExpressionsForCse());
}
// builder.add(new CompressedMaterialization());
builder.add(new CommonSubExpressionOpt());
// DO NOT replace PLAN NODE from here
if (cascadesContext.getConnectContext().getSessionVariable().pushTopnToAgg) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsBigInt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsInt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsLargeInt;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.coercion.CharacterType;

import com.amazonaws.services.logs.model.TagLogGroupRequest;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Optional;
import java.util.Map;
import java.util.Set;

/**
* select A from T group by A
* =>
* select any_value(A) from T group by encode_as_int(A)
*/
public class CompressedMaterializeGroupBy extends OneAnalysisRuleFactory {
@Override
public Rule build() {
Expand All @@ -36,15 +57,20 @@ public Rule build() {
);
}

private boolean canCompress(Expression expression) {
private Expression getEncodeExpression(Expression expression) {
DataType type = expression.getDataType();
Expression encodeExpr = null;
if (type instanceof CharacterType) {
CharacterType ct = (CharacterType) type;
if (ct.getLen() < 7) {
return true;
if (ct.getLen() < 4) {
encodeExpr = new EncodeAsInt(expression);
} else if (ct.getLen() < 7) {
encodeExpr = new EncodeAsBigInt(expression);
} else if (ct.getLen() < 15) {
encodeExpr = new EncodeAsLargeInt(expression);
}
}
return false;
return encodeExpr;
}

/*
Expand All @@ -55,8 +81,8 @@ private boolean canCompress(Expression expression) {
[not support] select substring(k, 1,2), sum(v) from t group by k
[support] select A as B from T group by A
*/
private Set<Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> aggregate) {
Set<Expression> encodableGroupbyExpressions = Sets.newHashSet();
private Map<Expression, Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> aggregate) {
Map<Expression, Expression> encodableGroupbyExpressions = Maps.newHashMap();
Set<Slot> slotShouldNotEncode = Sets.newHashSet();
for (NamedExpression ne : aggregate.getOutputExpressions()) {
if (ne instanceof Alias) {
Expand All @@ -68,7 +94,8 @@ private Set<Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> ag
}
}
for (Expression gb : aggregate.getGroupByExpressions()) {
if (canCompress(gb)) {
Expression encodeExpr = getEncodeExpression(gb);
if (encodeExpr != null) {
boolean encodable = true;
for (Slot gbs : gb.getInputSlots()) {
if (slotShouldNotEncode.contains(gbs)) {
Expand All @@ -77,7 +104,7 @@ private Set<Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> ag
}
}
if (encodable) {
encodableGroupbyExpressions.add(gb);
encodableGroupbyExpressions.put(gb, encodeExpr);
}
}
}
Expand All @@ -86,12 +113,12 @@ private Set<Expression> getEncodableGroupByExpressions(LogicalAggregate<Plan> ag

private LogicalAggregate<Plan> compressedMaterialize(LogicalAggregate<Plan> aggregate) {
List<Alias> encodedExpressions = Lists.newArrayList();
Set<Expression> encodableGroupByExpressions = getEncodableGroupByExpressions(aggregate);
Map<Expression, Expression> encodableGroupByExpressions = getEncodableGroupByExpressions(aggregate);
if (!encodableGroupByExpressions.isEmpty()) {
List<Expression> newGroupByExpressions = Lists.newArrayList();
for (Expression gp : aggregate.getGroupByExpressions()) {
if (encodableGroupByExpressions.contains(gp)) {
Alias alias = new Alias(new EncodeAsBigInt(gp));
if (encodableGroupByExpressions.containsKey(gp)) {
Alias alias = new Alias(encodableGroupByExpressions.get(gp));
newGroupByExpressions.add(alias);
encodedExpressions.add(alias);
} else {
Expand All @@ -100,9 +127,10 @@ private LogicalAggregate<Plan> compressedMaterialize(LogicalAggregate<Plan> aggr
}
List<NamedExpression> newOutput = Lists.newArrayList();
for (NamedExpression ne : aggregate.getOutputExpressions()) {
if (ne instanceof SlotReference && encodableGroupByExpressions.contains(ne)) {
// output A => output Any_value(A)
if (ne instanceof SlotReference && encodableGroupByExpressions.containsKey(ne)) {
newOutput.add(new Alias(ne.getExprId(), new AnyValue(ne), ne.getName()));
} else if (ne instanceof Alias && encodableGroupByExpressions.contains(((Alias) ne).child())) {
} else if (ne instanceof Alias && encodableGroupByExpressions.containsKey(((Alias) ne).child())) {
Expression child = ((Alias) ne).child();
Preconditions.checkArgument(child instanceof SlotReference,
"encode %s failed, not a slot", child);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,20 @@
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsBigInt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsInt;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.PlanUtils.CollectNonWindowedAggFuncs;
import org.apache.doris.nereids.util.Utils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.ArrayList;
Expand Down
69 changes: 68 additions & 1 deletion regression-test/suites/nereids_p0/compress_materialize.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ suite("compress_materialize") {

explain {
sql("select sum(v) from compress group by substring(k, 1, 3);")
contains("group by: encode_as_bigint(substring(k, 1, 3))")
contains("group by: encode_as_int(substring(k, 1, 3))")
}

explain {
sql("select sum(v) from compress group by substring(k, 1, 4);")
contains("group by: encode_as_bigint(substring(k, 1, 4))")
}

order_qt_encodeexpr "select sum(v) from compress group by substring(k, 1, 3);"


Expand All @@ -107,5 +113,66 @@ suite("compress_materialize") {
select k from compress group by k
) T join cmt2 on T.k = cmt2.k2;
"""

sql """
drop table if exists compressInt;
CREATE TABLE `compressInt` (
`k` varchar(3) NOT NULL,
`v` int NOT NULL
) ENGINE=OLAP
duplicate KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS AUTO
PROPERTIES (
"replication_num" = "1"
);
insert into compressInt values ("a", 1), ("aa", 2), ("bb", 3), ("b", 4), ("b", 5);
"""
explain{
sql "select k from compressInt group by k"
contains("encode_as_int")
}

sql """
drop table if exists compressLargeInt;
CREATE TABLE `compressLargeInt` (
`k` varchar(10) NOT NULL,
`v` int NOT NULL
) ENGINE=OLAP
duplicate KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS AUTO
PROPERTIES (
"replication_num" = "1"
);
insert into compressLargeInt values ("a", 1), ("aa", 2), ("bb", 3), ("b", 4), ("b", 5);
"""
explain{
sql "select k from compressLargeInt group by k"
contains("group by: encode_as_largeint(k)")
}


sql """
drop table if exists notcompress;
CREATE TABLE `notcompress` (
`k` varchar(16) NOT NULL,
`v` int NOT NULL
) ENGINE=OLAP
duplicate KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS AUTO
PROPERTIES (
"replication_num" = "1"
);
insert into notcompress values ("a", 1), ("aa", 2), ("bb", 3), ("b", 4), ("b", 5);
"""
explain{
sql "select k from notcompress group by k"
notContains("any_value")
}
}

0 comments on commit ac85b6c

Please sign in to comment.