Skip to content

Commit

Permalink
[feat](nereids)set actual row count in physical plan according to mer…
Browse files Browse the repository at this point in the history
…ged profile (apache#40361)

physical plan is already printed in profile.
however, it is hard to compare the estimated rows of sql operator and
the actual rows.
In this pr, we get actual rows from merged profile, and set it to
corresponding physical node in physical plan.
here is an example:
"PhysicalHashJoin[13890]@115 ( stats=17,964.27 actualRows=20499,
type=INNER_JOIN, hashCondition=[(l_suppkey#19 = s_suppkey#33)]"
the estmated rows is 17,964, the actual rows is 20499
Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
englefly committed Oct 22, 2024
1 parent 536dd3b commit 113325b
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
}
return fragmentsProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.Planner;

Expand All @@ -46,6 +49,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
Expand Down Expand Up @@ -108,6 +113,10 @@ public class Profile {
// Profile size is the size of profile file
private long profileSize = 0;

private PhysicalPlan physicalPlan;
public Map<String, Long> rowsProducedMap = new HashMap<>();
private List<PhysicalRelation> physicalRelations = new ArrayList<>();

private String changedSessionVarCache = "";

// Need default constructor for read from storage
Expand Down Expand Up @@ -276,20 +285,8 @@ public synchronized void updateSummary(Map<String, String> summaryInfo, boolean

if (planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
StringBuilder builder = new StringBuilder();
builder.append("\n");
builder.append(nereidsPlanner.getPhysicalPlan()
.treeString());
builder.append("\n");
for (PhysicalRelation relation : nereidsPlanner.getPhysicalRelations()) {
if (relation.getStats() != null) {
builder.append(relation).append("\n")
.append(relation.getStats().printColumnStats());
}
}
summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
builder.toString().replace("\n", "\n "));

physicalPlan = nereidsPlanner.getPhysicalPlan();
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
Expand Down Expand Up @@ -382,15 +379,43 @@ public void getExecutionProfileContent(StringBuilder builder) {

// Only generate merged profile for select, insert into select.
// Not support broker load now.
RuntimeProfile mergedProfile = null;
if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
try {
builder.append("\n MergedProfile \n");
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " ");
mergedProfile = this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap);
this.rowsProducedMap.putAll(mergedProfile.rowsProducedMap);
if (physicalPlan != null) {
updateActualRowCountOnPhysicalPlan(physicalPlan);
}
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile {} failed", this.id, aggProfileException);
}
}

if (physicalPlan != null) {
builder.append("\nPhysical Plan \n");
StringBuilder physcialPlanBuilder = new StringBuilder();
physcialPlanBuilder.append(physicalPlan.treeString());
physcialPlanBuilder.append("\n");
for (PhysicalRelation relation : physicalRelations) {
if (relation.getStats() != null) {
physcialPlanBuilder.append(relation).append("\n")
.append(relation.getStats().printColumnStats());
}
}
builder.append(
physcialPlanBuilder.toString().replace("\n", "\n "));
}

if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
builder.append("\nMergedProfile \n");
if (mergedProfile != null) {
mergedProfile.prettyPrint(builder, " ");
} else {
builder.append("build merged simple profile failed");
}
}

try {
// For load task, they will have multiple execution_profiles.
for (ExecutionProfile executionProfile : executionProfiles) {
Expand Down Expand Up @@ -681,4 +706,25 @@ private void getOnStorageProfile(StringBuilder builder) {

return;
}

public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public void setPhysicalPlan(PhysicalPlan physicalPlan) {
this.physicalPlan = physicalPlan;
}

private void updateActualRowCountOnPhysicalPlan(Plan plan) {
if (plan == null || rowsProducedMap.isEmpty()) {
return;
}
Long actualRowCount = rowsProducedMap.get(String.valueOf(((AbstractPlan) plan).getId()));
if (actualRowCount != null) {
((AbstractPlan) plan).updateActualRowCount(actualRowCount);
}
for (Plan child : plan.children()) {
updateActualRowCountOnPhysicalPlan(child);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class SummaryProfile {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num";
public static final String TRACE_ID = "Trace ID";
public static final String WORKLOAD_GROUP = "Workload Group";
public static final String PHYSICAL_PLAN = "Physical Plan";
public static final String DISTRIBUTED_PLAN = "Distributed Plan";
public static final String SYSTEM_MESSAGE = "System Message";
public static final String EXECUTED_BY_FRONTEND = "Executed By Frontend";
Expand Down Expand Up @@ -129,7 +128,6 @@ public class SummaryProfile {
START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_CATALOG, DEFAULT_DB, SQL_STATEMENT);
public static final ImmutableList<String> SUMMARY_KEYS = new ImmutableList.Builder<String>()
.addAll(SUMMARY_CAPTIONS)
.add(PHYSICAL_PLAN)
.add(DISTRIBUTED_PLAN)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* It is accessed by two kinds of thread, one is to create this RuntimeProfile
Expand Down Expand Up @@ -100,6 +103,8 @@ public class RuntimeProfile {
@SerializedName(value = "nodeid")
private int nodeid = -1;

public Map<String, Long> rowsProducedMap = new HashMap<>();

public RuntimeProfile() {
init();
}
Expand Down Expand Up @@ -494,6 +499,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
}
Expand All @@ -504,6 +510,12 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
return;
}
RuntimeProfile templateProfile = profiles.get(0);
Pattern pattern = Pattern.compile("nereids_id=(\\d+)");
Matcher matcher = pattern.matcher(templateProfile.getName());
String nereidsId = null;
if (matcher.find()) {
nereidsId = matcher.group(1);
}
Set<String> childCounterSet = templateProfile.childCounterMap.get(parentCounterName);
if (childCounterSet == null) {
return;
Expand All @@ -517,6 +529,9 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
Counter orgCounter = profile.counterMap.get(childCounterName);
aggCounter.addCounter(orgCounter);
}
if (nereidsId != null && childCounterName.equals("RowsProduced")) {
simpleProfile.rowsProducedMap.put(nereidsId, aggCounter.sum.getValue());
}
if (simpleProfile.counterMap.containsKey(parentCounterName)) {
simpleProfile.addCounter(childCounterName, aggCounter, parentCounterName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,8 @@ public List<Plan> getAncestors() {
}
return ancestors;
}

public void updateActualRowCount(long actualRowCount) {
statistics.setActualRowCount(actualRowCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ public Set<Slot> getConditionSlot() {

@Override
public String toString() {
List<Object> args = Lists.newArrayList("type", joinType,
List<Object> args = Lists.newArrayList(
"stats", statistics,
"type", joinType,
"hashCondition", hashJoinConjuncts,
"otherCondition", otherJoinConjuncts,
"markCondition", markJoinConjuncts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public int hashCode() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalCTEProducer[" + id.asInt() + "]",
"stats", statistics,
"cteId", cteId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ public String toString() {
TopnPushInfo topnPushInfo = (TopnPushInfo) getMutableState(
MutableState.KEY_PUSH_TOPN_TO_AGG).orElseGet(() -> null);
return Utils.toSqlString("PhysicalHashAggregate[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"aggPhase", aggregateParam.aggPhase,
"aggMode", aggregateParam.aggMode,
"stats", statistics,
"maybeUseStreaming", maybeUsingStream,
"groupByExpr", groupByExpressions,
"outputExpr", outputExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public String shapeInfo() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalQuickSort[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"orderKeys", orderKeys,
"phase", phase.toString(), "stats", statistics
"stats", statistics, "orderKeys", orderKeys,
"phase", phase.toString()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public String shapeInfo() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalTopN[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"limit", limit,
"offset", offset,
"orderKeys", orderKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public String toString() {
return Utils.toSqlString("PhysicalUnion" + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"qualifier", qualifier,
"outputs", outputs,
"regularChildrenOutputs", regularChildrenOutputs,
"constantExprsList", constantExprsList,
"stats", statistics);
"constantExprsList", constantExprsList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ public List<? extends Expression> getExpressions() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalWindow[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"windowFrameGroup", windowFrameGroup,
"requiredProperties", requireProperties, "stats", statistics
"requiredProperties", requireProperties
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,10 @@ public void updateProfile(boolean isFinished) {
// failed, the insert stmt should be success
try {
profile.updateSummary(getSummaryInfo(isFinished), isFinished, this.planner);
if (planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
profile.setPhysicalPlan(nereidsPlanner.getPhysicalPlan());
}
} catch (Throwable t) {
LOG.warn("failed to update profile, ignore this error", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public class Statistics {
// the byte size of one tuple
private double tupleSize;

private double deltaRowCount = 0.0;

private long actualRowCount = -1L;

public Statistics(double rowCount, Map<Expression, ColumnStatistic> expressionToColumnStats) {
this(rowCount, 1, expressionToColumnStats);
}
Expand Down Expand Up @@ -176,17 +180,24 @@ public double dataSizeFactor(List<Slot> slots) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
if (Double.isNaN(rowCount)) {
return "NaN";
builder.append("NaN");
} else if (Double.POSITIVE_INFINITY == rowCount) {
builder.append("Infinite");
} else if (Double.NEGATIVE_INFINITY == rowCount) {
builder.append("-Infinite");
} else {
DecimalFormat format = new DecimalFormat("#,###.##");
builder.append(format.format(rowCount));
}
if (Double.POSITIVE_INFINITY == rowCount) {
return "Infinite";
if (deltaRowCount > 0) {
builder.append("(").append((long) deltaRowCount).append(")");
}
if (Double.NEGATIVE_INFINITY == rowCount) {
return "-Infinite";
if (actualRowCount != -1) {
builder.append(" actualRows=").append(actualRowCount);
}
DecimalFormat format = new DecimalFormat("#,###.##");
return format.format(rowCount);
return builder.toString();
}

public String printColumnStats() {
Expand Down Expand Up @@ -263,4 +274,20 @@ public Statistics normalizeByRatio(double originRowCount) {
}
return builder.build();
}

public double getDeltaRowCount() {
return deltaRowCount;
}

public void setDeltaRowCount(double deltaRowCount) {
this.deltaRowCount = deltaRowCount;
}

public long getActualRowCount() {
return actualRowCount;
}

public void setActualRowCount(long actualRowCount) {
this.actualRowCount = actualRowCount;
}
}

0 comments on commit 113325b

Please sign in to comment.