Skip to content

Commit

Permalink
add is_merge tag for data sink
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Dec 9, 2024
1 parent 2bc011f commit 8ed2a47
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,9 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
}
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
sortNode.setMergeByExchange();
sortNode.setChildrenDistributeExprLists(distributeExprLists);
}
Expand Down Expand Up @@ -2196,6 +2199,9 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
exchangeNode.setChildrenDistributeExprLists(distributeExprLists);
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
Expand Down
10 changes: 9 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public abstract class DataSink {
// Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink.
protected PlanFragment fragment;

protected boolean isMerge = false;
/**
* Return an explain string for the DataSink. Each line of the explain will be
* prefixed
Expand Down Expand Up @@ -77,4 +77,12 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException {
throw new AnalysisException("Unknown table type " + table.getType());
}
}

public boolean isMerge() {
return isMerge;
}

public void setMerge(boolean merge) {
isMerge = merge;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) {
strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId());
strBuilder.append("\n");
}
if (isMerge) {
strBuilder.append("IS_MERGE: true\n");
}

return strBuilder.toString();
}
Expand Down Expand Up @@ -234,6 +237,7 @@ protected TDataSink toThrift() {
tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
}
}
tStreamSink.setIsMerge(isMerge);
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ struct TDataStreamSink {
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
13: optional list<Exprs.TExpr> tablet_sink_exprs
14: optional bool is_merge
}

struct TMultiCastDataStreamSink {
Expand Down

0 comments on commit 8ed2a47

Please sign in to comment.