Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](nereids) optimize small sql #43546

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vruntimefilter_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class VRuntimeFilterWrapper final : public VExpr {
template <typename T>
static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows,
T& always_true) {
always_true = filter_rows / (input_rows * 1.0L) < ignore_threshold;
always_true = static_cast<double>(filter_rows) / static_cast<double>(input_rows) <
ignore_threshold;
}

bool is_rf_wrapper() const override { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.common;

import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -40,7 +42,7 @@ public static byte[] compress(byte[] data) throws IOException {
public static byte[] decompress(byte[] data) throws IOException {
ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
return gzipStream.readAllBytes();
return IOUtils.toByteArray(gzipStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ public void setMutableState(String key, Object value) {
@Override
public Expression withChildren(List<Expression> children) {
throw new AnalysisException("could not call withChildren on UdfSignatureSearcher");

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseMvHint;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -1384,12 +1385,7 @@ public List<String> getEqualPartitionNames(List<Long> partitionIds1, List<Long>
}

public List<Long> getPartitionIds() {
readLock();
try {
return new ArrayList<>(idToPartition.keySet());
} finally {
readUnlock();
}
return Utils.fastToImmutableList(idToPartition.keySet());
}

public Set<String> getCopiedBfColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,9 @@ private PhysicalPlan chooseBestPlan(Group rootGroup, PhysicalProperties physical
}

private long getGarbageCollectionTime() {
if (!ConnectContext.get().getSessionVariable().enableProfile()) {
return 0;
}
List<GarbageCollectorMXBean> gcMxBeans = ManagementFactory.getGarbageCollectorMXBeans();
long initialGCTime = 0;
for (GarbageCollectorMXBean gcBean : gcMxBeans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ public class StatementContext implements Closeable {

private Backend groupCommitMergeBackend;

private boolean checkedPrivileges;

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -580,4 +582,12 @@ public void setGroupCommitMergeBackend(
Backend groupCommitMergeBackend) {
this.groupCommitMergeBackend = groupCommitMergeBackend;
}

public boolean isCheckedPrivileges() {
return checkedPrivileges;
}

public void setCheckedPrivileges(boolean checkedPrivileges) {
this.checkedPrivileges = checkedPrivileges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1890,12 +1890,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
int layerCount = project.getMultiLayerProjects().size();
for (int i = 0; i < layerCount; i++) {
List<NamedExpression> layer = project.getMultiLayerProjects().get(i);
projectionExprs = layer.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
slots = layer.stream()
.map(NamedExpression::toSlot)
.collect(Collectors.toList());

projectionExprs = new ArrayList<>(layer.size());
slots = new ArrayList<>(layer.size());
for (int j = 0; j < layer.size(); j++) {
NamedExpression layerExpr = layer.get(j);
projectionExprs.add(ExpressionTranslator.translate(layerExpr, context));
slots.add(layerExpr.toSlot());
}

if (i < layerCount - 1) {
inputPlanNode.addIntermediateProjectList(projectionExprs);
TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context);
Expand All @@ -1904,14 +1907,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
allProjectionExprs.addAll(projectionExprs);
}
} else {
projectionExprs = project.getProjects()
.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
slots = project.getProjects()
.stream()
.map(NamedExpression::toSlot)
.collect(Collectors.toList());
List<NamedExpression> projects = project.getProjects();
int projectNum = projects.size();
projectionExprs = new ArrayList<>(projectNum);
slots = new ArrayList<>(projectNum);
for (int j = 0; j < projectNum; j++) {
NamedExpression layerExpr = projects.get(j);
projectionExprs.add(ExpressionTranslator.translate(layerExpr, context));
slots.add(layerExpr.toSlot());
}
allProjectionExprs.addAll(projectionExprs);
}
// process multicast sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob;
import org.apache.doris.nereids.jobs.rewrite.TopicRewriteJob;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.FilteredRules;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.Rules;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;

Expand Down Expand Up @@ -94,10 +95,10 @@ public static RewriteJob bottomUp(RuleFactory... ruleFactories) {
}

public static RewriteJob bottomUp(List<RuleFactory> ruleFactories) {
List<Rule> rules = ruleFactories.stream()
Rules rules = new FilteredRules(ruleFactories.stream()
.map(RuleFactory::buildRules)
.flatMap(List::stream)
.collect(ImmutableList.toImmutableList());
.collect(ImmutableList.toImmutableList()));
return new RootPlanTreeRewriteJob(rules, PlanTreeRewriteBottomUpJob::new, getTraversePredicate(), true);
}

Expand All @@ -110,10 +111,10 @@ public static RewriteJob topDown(List<RuleFactory> ruleFactories) {
}

public static RewriteJob topDown(List<RuleFactory> ruleFactories, boolean once) {
List<Rule> rules = ruleFactories.stream()
Rules rules = new FilteredRules(ruleFactories.stream()
.map(RuleFactory::buildRules)
.flatMap(List::stream)
.collect(ImmutableList.toImmutableList());
.collect(ImmutableList.toImmutableList()));
return new RootPlanTreeRewriteJob(rules, PlanTreeRewriteTopDownJob::new, getTraversePredicate(), once);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.nereids.rules.analysis.ReplaceExpressionByChildOutput;
import org.apache.doris.nereids.rules.analysis.SubqueryToApply;
import org.apache.doris.nereids.rules.analysis.VariableToLiteral;
import org.apache.doris.nereids.rules.rewrite.MergeFilters;
import org.apache.doris.nereids.rules.rewrite.MergeProjects;
import org.apache.doris.nereids.rules.rewrite.SemiJoinCommute;
import org.apache.doris.nereids.rules.rewrite.SimplifyAggGroupBy;
Expand Down Expand Up @@ -177,7 +178,11 @@ private static List<RewriteJob> buildAnalyzerJobs(Optional<CustomTableResolver>
topDown(new LeadingJoin()),
bottomUp(new NormalizeGenerate()),
bottomUp(new SubqueryToApply()),
topDown(new MergeProjects())
topDown(
new MergeProjects(),
// merge normal filter and hidden column filter
new MergeFilters()
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.doris.nereids.rules.rewrite.CheckDataTypes;
import org.apache.doris.nereids.rules.rewrite.CheckMatchExpression;
import org.apache.doris.nereids.rules.rewrite.CheckMultiDistinct;
import org.apache.doris.nereids.rules.rewrite.CheckPrivileges;
import org.apache.doris.nereids.rules.rewrite.CheckRestorePartition;
import org.apache.doris.nereids.rules.rewrite.ClearContextStatus;
import org.apache.doris.nereids.rules.rewrite.CollectCteConsumerOutput;
Expand Down Expand Up @@ -84,7 +83,6 @@
import org.apache.doris.nereids.rules.rewrite.InferJoinNotNull;
import org.apache.doris.nereids.rules.rewrite.InferPredicates;
import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct;
import org.apache.doris.nereids.rules.rewrite.InlineLogicalView;
import org.apache.doris.nereids.rules.rewrite.LimitAggToTopNAgg;
import org.apache.doris.nereids.rules.rewrite.LimitSortToTopN;
import org.apache.doris.nereids.rules.rewrite.LogicalResultSinkToShortCircuitPointQuery;
Expand Down Expand Up @@ -220,13 +218,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
// but it appeared at LogicalApply.right. After the `Subquery unnesting` topic, all slots is placed in a
// normal position, then we can check column privileges by these steps
//
// 1. use ColumnPruning rule to derive the used slots in LogicalView
// 2. and then check the column privileges
// 3. finally, we can eliminate the LogicalView
topic("Inline view and check column privileges",
custom(RuleType.CHECK_PRIVILEGES, CheckPrivileges::new),
bottomUp(new InlineLogicalView())
),
topic("Eliminate optimization",
bottomUp(
new EliminateLimit(),
Expand Down Expand Up @@ -438,8 +429,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
new CollectCteConsumerOutput()
)
),
topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, QueryColumnCollector::new)
)
topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, QueryColumnCollector::new))
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.Rules;
import org.apache.doris.nereids.trees.plans.Plan;

import java.util.List;
Expand All @@ -40,7 +40,7 @@ public class PlanTreeRewriteBottomUpJob extends PlanTreeRewriteJob {
// so we will do specified action for each node based on their 'RewriteState'.
private static final String REWRITE_STATE_KEY = "rewrite_state";
private final RewriteJobContext rewriteJobContext;
private final List<Rule> rules;
private final Rules rules;
private final int batchId;

enum RewriteState {
Expand All @@ -57,7 +57,7 @@ enum RewriteState {

public PlanTreeRewriteBottomUpJob(
RewriteJobContext rewriteJobContext, JobContext context,
Predicate<Plan> isTraverseChildren, List<Rule> rules) {
Predicate<Plan> isTraverseChildren, Rules rules) {
super(JobType.BOTTOM_UP_REWRITE, context, isTraverseChildren);
this.rewriteJobContext = Objects.requireNonNull(rewriteJobContext, "rewriteContext cannot be null");
this.rules = Objects.requireNonNull(rules, "rules cannot be null");
Expand Down Expand Up @@ -88,6 +88,13 @@ public void execute() {
private void rewriteThis() {
// Link the current node with the sub-plan to get the current plan which is used in the rewrite phase later.
Plan plan = linkChildren(rewriteJobContext.plan, rewriteJobContext.childrenContext);
if (rules.getCurrentAndChildrenRules(plan).isEmpty()) {
// No new plan is generated, so just set the state of the current plan to 'REWRITTEN'.
setState(plan, RewriteState.REWRITTEN, batchId);
rewriteJobContext.setResult(plan);
return;
}

RewriteResult rewriteResult = rewrite(plan, rules, rewriteJobContext);
if (rewriteResult.hasNewPlan) {
RewriteJobContext newJobContext = rewriteJobContext.withPlan(rewriteResult.plan);
Expand All @@ -110,6 +117,13 @@ private void rewriteThis() {

private void ensureChildrenRewritten() {
Plan plan = rewriteJobContext.plan;
if (rules.getCurrentAndChildrenRules(plan).isEmpty()) {
// No new plan is generated, so just set the state of the current plan to 'REWRITTEN'.
setState(plan, RewriteState.REWRITTEN, batchId);
rewriteJobContext.setResult(plan);
return;
}

int batchId = rewriteJobContext.batchId;
setState(plan, RewriteState.REWRITE_THIS, batchId);
pushJob(new PlanTreeRewriteBottomUpJob(rewriteJobContext, context, isTraverseChildren, rules));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.nereids.minidump.NereidsTracer;
import org.apache.doris.nereids.pattern.Pattern;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.Rules;
import org.apache.doris.nereids.trees.plans.Plan;

import com.google.common.collect.ImmutableList;
Expand All @@ -43,12 +44,12 @@ public PlanTreeRewriteJob(JobType type, JobContext context, Predicate<Plan> isTr
this.isTraverseChildren = Objects.requireNonNull(isTraverseChildren, "isTraverseChildren can not be null");
}

protected final RewriteResult rewrite(Plan plan, List<Rule> rules, RewriteJobContext rewriteJobContext) {
protected final RewriteResult rewrite(Plan plan, Rules rules, RewriteJobContext rewriteJobContext) {
CascadesContext cascadesContext = context.getCascadesContext();
cascadesContext.setIsRewriteRoot(rewriteJobContext.isRewriteRoot());

boolean showPlanProcess = cascadesContext.showPlanProcess();
for (Rule rule : rules) {
for (Rule rule : rules.getCurrentRules(plan)) {
if (disableRules.get(rule.getRuleType().type())) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.Rules;
import org.apache.doris.nereids.trees.plans.Plan;

import java.util.List;
Expand All @@ -34,11 +34,11 @@
public class PlanTreeRewriteTopDownJob extends PlanTreeRewriteJob {

private final RewriteJobContext rewriteJobContext;
private final List<Rule> rules;
private final Rules rules;

public PlanTreeRewriteTopDownJob(
RewriteJobContext rewriteJobContext, JobContext context,
Predicate<Plan> isTraverseChildren, List<Rule> rules) {
Predicate<Plan> isTraverseChildren, Rules rules) {
super(JobType.TOP_DOWN_REWRITE, context, isTraverseChildren);
this.rewriteJobContext = Objects.requireNonNull(rewriteJobContext, "rewriteContext cannot be null");
this.rules = Objects.requireNonNull(rules, "rules cannot be null");
Expand All @@ -47,6 +47,14 @@ public PlanTreeRewriteTopDownJob(
@Override
public void execute() {
if (!rewriteJobContext.childrenVisited) {
if (rules.getCurrentAndChildrenRules(rewriteJobContext.plan).isEmpty()) {
rewriteJobContext.setResult(rewriteJobContext.plan);
if (rewriteJobContext.parentContext == null) {
context.getCascadesContext().setRewritePlan(rewriteJobContext.plan);
}
return;
}

RewriteResult rewriteResult = rewrite(rewriteJobContext.plan, rules, rewriteJobContext);
if (rewriteResult.hasNewPlan) {
RewriteJobContext newContext = rewriteJobContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.doris.nereids.metrics.consumer.LogConsumer;
import org.apache.doris.nereids.metrics.event.TransformEvent;
import org.apache.doris.nereids.pattern.GroupExpressionMatching;
import org.apache.doris.nereids.rules.FilteredRules;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.Rules;
import org.apache.doris.nereids.trees.plans.Plan;

import com.google.common.base.Preconditions;
Expand All @@ -50,15 +52,15 @@ public class RewriteTopDownJob extends Job {
EventChannel.getDefaultChannel().addConsumers(new LogConsumer(TransformEvent.class, NereidsPlanner.LOG)));

private final Group group;
private final List<Rule> rules;
private final Rules rules;

public RewriteTopDownJob(Group group, JobContext context, List<RuleFactory> factories) {
this(group, factories.stream()
this(group, new FilteredRules(factories.stream()
.flatMap(factory -> factory.buildRules().stream())
.collect(Collectors.toList()), context, true);
.collect(Collectors.toList())), context, true);
}

public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context) {
public RewriteTopDownJob(Group group, Rules rules, JobContext context) {
this(group, rules, context, true);
}

Expand All @@ -69,7 +71,7 @@ public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context) {
* @param rules rewrite rules
* @param context planner context
*/
public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context, boolean once) {
public RewriteTopDownJob(Group group, Rules rules, JobContext context, boolean once) {
super(JobType.TOP_DOWN_REWRITE, context, once);
this.group = Objects.requireNonNull(group, "group cannot be null");
this.rules = Objects.requireNonNull(rules, "rules cannot be null");
Expand All @@ -84,7 +86,7 @@ public EventProducer getEventTracer() {
public void execute() {
GroupExpression logicalExpression = group.getLogicalExpression();
countJobExecutionTimesOfGroupExpressions(logicalExpression);
for (Rule rule : rules) {
for (Rule rule : rules.getCurrentAndChildrenRules()) {
if (rule.isInvalid(disableRules, logicalExpression)) {
continue;
}
Expand Down
Loading
Loading