diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index bea5eec432b2ab..8748892643fb37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2145,6 +2145,9 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort sor } SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0); ((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo()); + if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) { + inputFragment.getChild(0).getSink().setMerge(true); + } sortNode.setMergeByExchange(); sortNode.setChildrenDistributeExprLists(distributeExprLists); } @@ -2196,6 +2199,9 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN topN, PlanTra ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot(); exchangeNode.setChildrenDistributeExprLists(distributeExprLists); exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo()); + if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) { + inputFragment.getChild(0).getSink().setMerge(true); + } exchangeNode.setLimit(topN.getLimit()); exchangeNode.setOffset(topN.getOffset()); ((SortNode) exchangeNode.getChild(0)).setMergeByExchange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index 8d6daa2f8b72b4..358a071fe7ca53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -39,7 +39,7 @@ public abstract class DataSink { // Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink. protected PlanFragment fragment; - + protected boolean isMerge = false; /** * Return an explain string for the DataSink. Each line of the explain will be * prefixed @@ -77,4 +77,12 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException { throw new AnalysisException("Unknown table type " + table.getType()); } } + + public boolean isMerge() { + return isMerge; + } + + public void setMerge(boolean merge) { + isMerge = merge; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index ef42190fa25004..ceda692aa4b01f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -176,6 +176,9 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId()); strBuilder.append("\n"); } + if (isMerge) { + strBuilder.append("IS_MERGE: true\n"); + } return strBuilder.toString(); } @@ -234,6 +237,7 @@ protected TDataSink toThrift() { tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); } } + tStreamSink.setIsMerge(isMerge); tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index ed7ccee69cd9a1..03a23c2c532ac3 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -189,6 +189,7 @@ struct TDataStreamSink { 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id 13: optional list tablet_sink_exprs + 14: optional bool is_merge } struct TMultiCastDataStreamSink {