Skip to content

Commit

Permalink
[opt](nereids) infer result column name in select outfile stmt
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Nov 12, 2023
1 parent c02c009 commit 64ada78
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* unbound file sink
*/
public class UnboundFileSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Unbound, Sink {

private final String filePath;
private final String format;
private final Map<String, String> properties;

public UnboundFileSink(CHILD_TYPE child, String filePath, String format, Map<String, String> properties) {
super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), child);
this.filePath = filePath;
this.format = format;
this.properties = properties;
}

public UnboundFileSink(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child,
String filePath, String format, Map<String, String> properties) {
super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), groupExpression, logicalProperties, child);
this.filePath = filePath;
this.format = format;
this.properties = properties;
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundFileSink only accepts one child");
return new UnboundFileSink<>(groupExpression, Optional.empty(), children.get(0),
filePath, format, properties);
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundFileSink(this, context);
}

@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundFileSink<>(groupExpression, Optional.of(getLogicalProperties()), child(),
filePath, format, properties);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundFileSink only accepts one child");
return new UnboundFileSink<>(groupExpression, logicalProperties, children.get(0),
filePath, format, properties);

}

@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
}

@Override
public String toString() {
return Utils.toSqlString("UnboundFileSink[" + id.asInt() + "]");
}

public String getFilePath() {
return filePath;
}

public String getFormat() {
return format;
}

public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundFileSink;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
Expand Down Expand Up @@ -61,7 +62,7 @@ public LogicalPlan getLogicalPlan() {

@Override
public boolean hasOutFileClause() {
return logicalPlan instanceof LogicalFileSink;
return logicalPlan instanceof LogicalFileSink || logicalPlan instanceof UnboundFileSink;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
import org.apache.doris.nereids.DorisParserBaseVisitor;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundFileSink;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
Expand Down Expand Up @@ -347,7 +348,6 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
Expand Down Expand Up @@ -2326,7 +2326,7 @@ private LogicalPlan withOutFile(LogicalPlan plan, OutFileClauseContext ctx) {
properties = visitPropertyClause(ctx.propertyClause());
}
Literal filePath = (Literal) visit(ctx.filePath);
return new LogicalFileSink<>(filePath.getStringValue(), format, properties, ImmutableList.of(), plan);
return new UnboundFileSink<Plan>(plan, filePath.getStringValue(), format, properties);
}

private LogicalPlan withQueryOrganization(LogicalPlan inputPlan, QueryOrganizationContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundFileSink;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;

Expand All @@ -40,9 +40,9 @@ public Plan visitUnboundOlapTableSink(UnboundOlapTableSink<? extends Plan> unbou
}

@Override
public Plan visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, StatementContext context) {
public Plan visitUnboundFileSink(UnboundFileSink<? extends Plan> unboundFileSink, StatementContext context) {
turnOffPipeline(context);
return fileSink;
return unboundFileSink;
}

private void turnOffPipeline(StatementContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum RuleType {

// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_FILE_SINK(RuleTypeClass.REWRITE),
BINDING_NON_LEAF_LOGICAL_PLAN(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
BINDING_RELATION(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
Expand All @@ -69,6 +70,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
Expand Down Expand Up @@ -567,21 +569,30 @@ protected boolean condition(Rule rule, Plan plan) {
),
RuleType.BINDING_RESULT_SINK.build(
unboundResultSink().then(sink -> {

final ImmutableListMultimap.Builder<ExprId, Integer> exprIdToIndexMapBuilder =
ImmutableListMultimap.builder();
List<Slot> childOutput = sink.child().getOutput();
for (int index = 0; index < childOutput.size(); index++) {
exprIdToIndexMapBuilder.put(childOutput.get(index).getExprId(), index);
}
InferPlanOutputAlias aliasInfer = new InferPlanOutputAlias(childOutput);
sink.child().accept(aliasInfer, exprIdToIndexMapBuilder.build());
return new LogicalResultSink<>(aliasInfer.getOutputs(), sink.child());
return new LogicalResultSink<>(inferColumnNames(sink), sink.child());
})
),
RuleType.BINDING_FILE_SINK.build(
unboundFileSink().then(sink -> {
return new LogicalFileSink<>(sink.getFilePath(), sink.getFormat(), sink.getProperties(),
inferColumnNames(sink), sink.child());
})
)
).stream().map(ruleCondition).collect(ImmutableList.toImmutableList());
}

private List<NamedExpression> inferColumnNames(LogicalSink<Plan> sink) {
final ImmutableListMultimap.Builder<ExprId, Integer> exprIdToIndexMapBuilder =
ImmutableListMultimap.builder();
List<Slot> sinkOutput = sink.child().getOutput();
for (int index = 0; index < sinkOutput.size(); index++) {
exprIdToIndexMapBuilder.put(sinkOutput.get(index).getExprId(), index);
}
InferPlanOutputAlias aliasInfer = new InferPlanOutputAlias(sinkOutput);
sink.child().accept(aliasInfer, exprIdToIndexMapBuilder.build());
return aliasInfer.getOutputs();
}

private Plan bindSort(LogicalSort<? extends Plan> sort, Plan plan, CascadesContext ctx) {
// 1. We should deduplicate the slots, otherwise the binding process will fail due to the
// ambiguous slots exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.visitor;

import org.apache.doris.nereids.analyzer.UnboundFileSink;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.trees.plans.Plan;
Expand Down Expand Up @@ -56,6 +57,10 @@ default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResult
return visitLogicalSink(unboundResultSink, context);
}

default R visitUnboundFileSink(UnboundFileSink<? extends Plan> unboundFileSink, C context) {
return visitLogicalSink(unboundFileSink, context);
}

// *******************************
// logical
// *******************************
Expand Down

0 comments on commit 64ada78

Please sign in to comment.