Skip to content

Commit

Permalink
set actual row count in physical plan according to merged profile
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Sep 4, 2024
1 parent 3615a36 commit 5753240
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
}
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
}
return fragmentsProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.Planner;

Expand All @@ -45,6 +48,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
Expand Down Expand Up @@ -107,6 +112,10 @@ public class Profile {
// Profile size is the size of profile file
private long profileSize = 0;

private PhysicalPlan physicalPlan;
public Map<String, Long> rowsProducedMap = new HashMap<>();
private List<PhysicalRelation> physicalRelations = new ArrayList<>();

// Need default constructor for read from storage
public Profile() {}

Expand Down Expand Up @@ -273,20 +282,8 @@ public synchronized void updateSummary(Map<String, String> summaryInfo, boolean

if (planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
StringBuilder builder = new StringBuilder();
builder.append("\n");
builder.append(nereidsPlanner.getPhysicalPlan()
.treeString());
builder.append("\n");
for (PhysicalRelation relation : nereidsPlanner.getPhysicalRelations()) {
if (relation.getStats() != null) {
builder.append(relation).append("\n")
.append(relation.getStats().printColumnStats());
}
}
summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
builder.toString().replace("\n", "\n "));

physicalPlan = nereidsPlanner.getPhysicalPlan();
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
Expand Down Expand Up @@ -414,15 +411,43 @@ public void getExecutionProfileContent(StringBuilder builder) {

// Only generate merged profile for select, insert into select.
// Not support broker load now.
RuntimeProfile mergedProfile = null;
if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
try {
builder.append("\n MergedProfile \n");
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " ");
mergedProfile = this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap);
this.rowsProducedMap.putAll(mergedProfile.rowsProducedMap);
if (physicalPlan != null) {
updateActualRowCountOnPhysicalPlan(physicalPlan);
}
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile {} failed", this.id, aggProfileException);
}
}

if (physicalPlan != null) {
builder.append("\nPhysical Plan \n");
StringBuilder physcialPlanBuilder = new StringBuilder();
physcialPlanBuilder.append(physicalPlan.treeString());
physcialPlanBuilder.append("\n");
for (PhysicalRelation relation : physicalRelations) {
if (relation.getStats() != null) {
physcialPlanBuilder.append(relation).append("\n")
.append(relation.getStats().printColumnStats());
}
}
builder.append(
physcialPlanBuilder.toString().replace("\n", "\n "));
}

if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
builder.append("\nMergedProfile \n");
if (mergedProfile != null) {
mergedProfile.prettyPrint(builder, " ");
} else {
builder.append("build merged simple profile failed");
}
}

try {
// For load task, they will have multiple execution_profiles.
for (ExecutionProfile executionProfile : executionProfiles) {
Expand Down Expand Up @@ -646,4 +671,25 @@ public boolean shouldBeRemoveFromMemory() {

return true;
}

public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public void setPhysicalPlan(PhysicalPlan physicalPlan) {
this.physicalPlan = physicalPlan;
}

private void updateActualRowCountOnPhysicalPlan(Plan plan) {
if (plan == null || rowsProducedMap.isEmpty()) {
return;
}
Long actualRowCount = rowsProducedMap.get(String.valueOf(((AbstractPlan) plan).getId()));
if (actualRowCount != null) {
((AbstractPlan) plan).updateActualRowCount(actualRowCount);
}
for (Plan child : plan.children()) {
updateActualRowCountOnPhysicalPlan(child);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class SummaryProfile {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num";
public static final String TRACE_ID = "Trace ID";
public static final String WORKLOAD_GROUP = "Workload Group";
public static final String PHYSICAL_PLAN = "Physical Plan";
public static final String DISTRIBUTED_PLAN = "Distributed Plan";
public static final String SYSTEM_MESSAGE = "System Message";
public static final String EXECUTED_BY_FRONTEND = "Executed By Frontend";
Expand Down Expand Up @@ -129,7 +128,6 @@ public class SummaryProfile {
START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_CATALOG, DEFAULT_DB, SQL_STATEMENT);
public static final ImmutableList<String> SUMMARY_KEYS = new ImmutableList.Builder<String>()
.addAll(SUMMARY_CAPTIONS)
.add(PHYSICAL_PLAN)
.add(DISTRIBUTED_PLAN)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* It is accessed by two kinds of thread, one is to create this RuntimeProfile
Expand Down Expand Up @@ -100,6 +103,8 @@ public class RuntimeProfile {
@SerializedName(value = "nodeid")
private int nodeid = -1;

public Map<String, Long> rowsProducedMap = new HashMap<>();

public RuntimeProfile() {
init();
}
Expand Down Expand Up @@ -494,6 +499,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
}
Expand All @@ -504,6 +510,12 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
return;
}
RuntimeProfile templateProfile = profiles.get(0);
Pattern pattern = Pattern.compile("nereids_id=(\\d+)");
Matcher matcher = pattern.matcher(templateProfile.getName());
String nereidsId = null;
if (matcher.find()) {
nereidsId = matcher.group(1);
}
Set<String> childCounterSet = templateProfile.childCounterMap.get(parentCounterName);
if (childCounterSet == null) {
return;
Expand All @@ -517,6 +529,9 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
Counter orgCounter = profile.counterMap.get(childCounterName);
aggCounter.addCounter(orgCounter);
}
if (nereidsId != null && childCounterName.equals("RowsProduced")) {
simpleProfile.rowsProducedMap.put(nereidsId, aggCounter.sum.getValue());
}
if (simpleProfile.counterMap.containsKey(parentCounterName)) {
simpleProfile.addCounter(childCounterName, aggCounter, parentCounterName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,8 @@ public List<Plan> getAncestors() {
}
return ancestors;
}

public void updateActualRowCount(long actualRowCount) {
statistics.setActualRowCount(actualRowCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ public Set<Slot> getConditionSlot() {

@Override
public String toString() {
List<Object> args = Lists.newArrayList("type", joinType,
List<Object> args = Lists.newArrayList(
"stats", statistics,
"type", joinType,
"hashCondition", hashJoinConjuncts,
"otherCondition", otherJoinConjuncts,
"markCondition", markJoinConjuncts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public int hashCode() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalCTEProducer[" + id.asInt() + "]",
"stats", statistics,
"cteId", cteId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ public String toString() {
TopnPushInfo topnPushInfo = (TopnPushInfo) getMutableState(
MutableState.KEY_PUSH_TOPN_TO_AGG).orElseGet(() -> null);
return Utils.toSqlString("PhysicalHashAggregate[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"aggPhase", aggregateParam.aggPhase,
"aggMode", aggregateParam.aggMode,
"stats", statistics,
"maybeUseStreaming", maybeUsingStream,
"groupByExpr", groupByExpressions,
"outputExpr", outputExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public String shapeInfo() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalQuickSort[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"orderKeys", orderKeys,
"phase", phase.toString(), "stats", statistics
"stats", statistics, "orderKeys", orderKeys,
"phase", phase.toString()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public String shapeInfo() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalTopN[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"limit", limit,
"offset", offset,
"orderKeys", orderKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public String toString() {
return Utils.toSqlString("PhysicalUnion" + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"qualifier", qualifier,
"outputs", outputs,
"regularChildrenOutputs", regularChildrenOutputs,
"constantExprsList", constantExprsList,
"stats", statistics);
"constantExprsList", constantExprsList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ public List<? extends Expression> getExpressions() {
@Override
public String toString() {
return Utils.toSqlString("PhysicalWindow[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"stats", statistics,
"windowFrameGroup", windowFrameGroup,
"requiredProperties", requireProperties, "stats", statistics
"requiredProperties", requireProperties
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,10 @@ public void updateProfile(boolean isFinished) {
// failed, the insert stmt should be success
try {
profile.updateSummary(getSummaryInfo(isFinished), isFinished, this.planner);
if (planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
profile.setPhysicalPlan(nereidsPlanner.getPhysicalPlan());
}
} catch (Throwable t) {
LOG.warn("failed to update profile, ignore this error", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class Statistics {

private double deltaRowCount = 0.0;

private long actualRowCount = -1L;

public Statistics(Statistics another) {
this.rowCount = another.rowCount;
this.widthInJoinCluster = another.widthInJoinCluster;
Expand Down Expand Up @@ -193,21 +195,24 @@ public double dataSizeFactor(List<Slot> slots) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
if (Double.isNaN(rowCount)) {
return "NaN";
}
if (Double.POSITIVE_INFINITY == rowCount) {
return "Infinite";
}
if (Double.NEGATIVE_INFINITY == rowCount) {
return "-Infinite";
builder.append("NaN");
} else if (Double.POSITIVE_INFINITY == rowCount) {
builder.append("Infinite");
} else if (Double.NEGATIVE_INFINITY == rowCount) {
builder.append("-Infinite");
} else {
DecimalFormat format = new DecimalFormat("#,###.##");
builder.append(format.format(rowCount));
}
DecimalFormat format = new DecimalFormat("#,###.##");
String rows = format.format(rowCount);
if (deltaRowCount > 0) {
rows = rows + "(" + format.format(deltaRowCount) + ")";
builder.append("(").append(deltaRowCount).append(")");
}
if (actualRowCount != -1) {
builder.append(" actualRows=").append(actualRowCount);
}
return rows;
return builder.toString();
}

public String printColumnStats() {
Expand Down Expand Up @@ -292,4 +297,12 @@ public double getDeltaRowCount() {
public void setDeltaRowCount(double deltaRowCount) {
this.deltaRowCount = deltaRowCount;
}

public long getActualRowCount() {
return actualRowCount;
}

public void setActualRowCount(long actualRowCount) {
this.actualRowCount = actualRowCount;
}
}

0 comments on commit 5753240

Please sign in to comment.