Skip to content

Commit

Permalink
[feature](profile) support simply profile (apache#23377)
Browse files Browse the repository at this point in the history
A Simplified Version of the Profile

Divided into three levels:
Level 2: The original profile.
Level 1: Instances with identical structures are merged, utilizing concatenation for info strings, and recording the extremum for time types.


Note that currently, this is purely experimental, simplifying the profile on the frontend (you can view profiles at any level).

Subsequently, we will transition the simplification process to the backend. At that point, due to the simplification being done on the backend, viewing profiles at other levels won't be possible.

Due to the issue with the pipeline structure, the active time does not accurately reflect the time of the operators.

```
set enable_simply_profile = false;
set enable_simply_profile = true;
```
  • Loading branch information
Mryange authored Sep 15, 2023
1 parent 320f1e9 commit 23f01dd
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 11 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t
}
}
*max_time = local_max_time;
*min_time = local_min_time;
*min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
}

template <typename Parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public synchronized void update(long startTime, Map<String, String> summaryInfo,
executionProfile.update(startTime, isFinished);
}
rootProfile.computeTimeInProfile();
rootProfile.setProfileLevel();
ProfileManager.getInstance().pushProfile(rootProfile);
this.isFinished = isFinished;
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public void setValue(TUnit type, long value) {
this.value = value;
}

public void setValue(long value) {
this.value = value;
}

public TUnit getType() {
return TUnit.findByValue(type);
}
Expand All @@ -45,4 +49,20 @@ public Counter(TUnit type, long value) {
this.value = value;
this.type = type.getValue();
}

public void addValue(Counter other) {
this.value += other.value;
}

public void divValue(long div) {
if (div <= 0) {
return;
}
value /= div;
}

public boolean isTimeType() {
TUnit ttype = TUnit.findByValue(type);
return ttype == TUnit.TIME_MS || ttype == TUnit.TIME_NS || ttype == TUnit.TIME_S;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ProfileElement(RuntimeProfile profile) {

private final RuntimeProfile profile;
// cache the result of getProfileContent method
private volatile String profileContent;
private volatile String profileContent = null;
public Map<String, String> infoStrings = Maps.newHashMap();
public MultiProfileTreeBuilder builder = null;
public String errMsg = "";
Expand All @@ -79,11 +79,16 @@ public ProfileElement(RuntimeProfile profile) {

// lazy load profileContent because sometimes profileContent is very large
public String getProfileContent() {
if (profileContent != null) {
return profileContent;
}

// no need to lock because the possibility of concurrent read is very low
profileContent = profile.toString();
if (profileContent == null) {
// Simple profile will change the structure of the profile.
try {
profileContent = profile.getSimpleString();
} catch (Exception e) {
LOG.warn("profile get error : " + e.toString());
}
}
return profileContent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TCounter;
import org.apache.doris.thrift.TRuntimeProfileNode;
import org.apache.doris.thrift.TRuntimeProfileTree;
Expand Down Expand Up @@ -47,6 +48,10 @@
public class RuntimeProfile {
private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class);
public static String ROOT_COUNTER = "";
public static int FRAGMENT_DEPTH = 3;
public static String MAX_TIME_PRE = "max: ";
public static String MIN_TIME_PRE = "min: ";
public static String AVG_TIME_PRE = "avg: ";
private Counter counterTotalTime;
private double localTimePercent;

Expand All @@ -69,6 +74,7 @@ public class RuntimeProfile {

private Boolean isDone = false;
private Boolean isCancel = false;
private boolean enableSimplyProfile = false;

public RuntimeProfile(String name) {
this();
Expand Down Expand Up @@ -245,10 +251,10 @@ private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
}

// Print the profile:
// 1. Profile Name
// 2. Info Strings
// 3. Counters
// 4. Children
// 1. Profile Name
// 2. Info Strings
// 3. Counters
// 4. Children
public void prettyPrint(StringBuilder builder, String prefix) {
Counter counter = this.counterMap.get("TotalTime");
Preconditions.checkState(counter != null);
Expand Down Expand Up @@ -299,12 +305,198 @@ public void prettyPrint(StringBuilder builder, String prefix) {
}
}

public void simpleProfile(int depth) {
if (depth == FRAGMENT_DEPTH) {
mergeMutiInstance(childList);
return;
}
for (int i = 0; i < childList.size(); i++) {
Pair<RuntimeProfile, Boolean> pair = childList.get(i);
RuntimeProfile profile = pair.first;
profile.simpleProfile(depth + 1);
}
}

private static void mergeMutiInstance(
LinkedList<Pair<RuntimeProfile, Boolean>> childList) {
/*
* Fragment 1: Fragment 1:
* Instance 0 Instance (total)
* Instance 1
* Instance 2
*/
int numInstance = childList.size();
Pair<RuntimeProfile, Boolean> pair = childList.get(0);
RuntimeProfile mergedProfile = pair.first;
LinkedList<RuntimeProfile> other = new LinkedList<RuntimeProfile>();
for (int i = 1; i < childList.size(); i++) {
other.add(childList.get(i).first);
}
mergeInstanceProfile(mergedProfile, other);
childList.clear();
mergedProfile.name = "Instance " + "(" + numInstance + ")";
childList.add(Pair.of(mergedProfile, pair.second));
}

private static LinkedList<RuntimeProfile> getChildListFromLists(int idx, LinkedList<RuntimeProfile> rhs) {
LinkedList<RuntimeProfile> ret = new LinkedList<RuntimeProfile>();
for (RuntimeProfile profile : rhs) {
ret.add(profile.childList.get(idx).first);
}
return ret;
}

private static LinkedList<Counter> getCounterListFromLists(String counterName, LinkedList<RuntimeProfile> rhs) {
LinkedList<Counter> ret = new LinkedList<Counter>();
for (RuntimeProfile profile : rhs) {
ret.add(profile.counterMap.get(counterName));
}
return ret;
}

private static void mergeInstanceProfile(RuntimeProfile src, LinkedList<RuntimeProfile> rhs) {
mergeProfileCounter(src, ROOT_COUNTER, rhs);
mergeTotalTime(src, rhs);
mergeProfileInfoStr(src, rhs);
removePipelineContext(src);
for (int i = 0; i < src.childList.size(); i++) {
RuntimeProfile srcChild = src.childList.get(i).first;
LinkedList<RuntimeProfile> rhsChild = getChildListFromLists(i, rhs);
mergeInstanceProfile(srcChild, rhsChild);
}
}

private static void mergeTotalTime(RuntimeProfile src, LinkedList<RuntimeProfile> rhs) {
Counter counter = src.counterMap.get("TotalTime");
for (RuntimeProfile profile : rhs) {
Counter othCounter = profile.counterMap.get("TotalTime");
if (othCounter != null && counter != null) {
counter.addValue(othCounter);
}
}
counter.setValue(0); // Because the time is not accurate, it has been set to 0.
}

private static void removePipelineContext(RuntimeProfile src) {
LinkedList<Pair<RuntimeProfile, Boolean>> newChildList = new LinkedList<Pair<RuntimeProfile, Boolean>>();
for (Pair<RuntimeProfile, Boolean> pair : src.childList) {
RuntimeProfile profile = pair.first;
if (!profile.name.equals("PipelineContext")) {
newChildList.add(pair);
}
}
src.childList = newChildList;
}

private static void mergeProfileCounter(RuntimeProfile src, String counterName, LinkedList<RuntimeProfile> rhs) {
Set<String> childCounterSet = src.childCounterMap.get(counterName);
if (childCounterSet == null) {
return;
}
List<String> childCounterList = new LinkedList<>(childCounterSet);
for (String childCounterName : childCounterList) {
Counter counter = src.counterMap.get(childCounterName);
LinkedList<Counter> rhsCounter = getCounterListFromLists(childCounterName, rhs);

mergeProfileCounter(src, childCounterName, rhs);
mergeCounter(src, childCounterName, counter, rhsCounter);
removeZeroeCounter(childCounterSet, childCounterName, counter);

}
}

private static void mergeProfileInfoStr(RuntimeProfile src, LinkedList<RuntimeProfile> rhs) {
for (String key : src.infoStringsDisplayOrder) {
Set<String> strList = new TreeSet<String>();
strList.add(src.infoStrings.get(key));
for (RuntimeProfile profile : rhs) {
String value = profile.infoStrings.get(key);
if (value != null) {
strList.add(value);
}
}
try {
String joinedString = String.join(" | ", strList);
src.infoStrings.put(key, joinedString);
} catch (Exception e) {
return;
}
}
}

private static void removeZeroeCounter(Set<String> childCounterSet, String childCounterName, Counter counter) {
if (counter.getValue() == 0) {
childCounterSet.remove(childCounterName);
}
}

private static void mergeCounter(RuntimeProfile src, String counterName, Counter counter,
LinkedList<Counter> rhsCounter) {
if (rhsCounter.size() == 0) {
return;
}
if (counter.isTimeType()) {
Counter maxCounter = new Counter(counter.getType(), counter.getValue());
Counter minCounter = new Counter(counter.getType(), counter.getValue());
for (Counter cnt : rhsCounter) {
if (cnt.getValue() > maxCounter.getValue()) {
maxCounter.setValue(cnt.getValue());
}
if (cnt.getValue() < minCounter.getValue()) {
minCounter.setValue(cnt.getValue());
}
}
for (Counter cnt : rhsCounter) {
counter.addValue(cnt);
}
long countNumber = rhsCounter.size() + 1;
counter.divValue(countNumber);
String maxCounterName = MAX_TIME_PRE + counterName;
String minCounterName = MIN_TIME_PRE + counterName;
src.counterMap.put(minCounterName, minCounter);
src.counterMap.put(maxCounterName, maxCounter);
TreeSet<String> childCounterSet = src.childCounterMap.get(counterName);
if (childCounterSet == null) {
src.childCounterMap.put(counterName, new TreeSet<String>());
childCounterSet = src.childCounterMap.get(counterName);
}
childCounterSet.add(minCounterName);
childCounterSet.add(maxCounterName);
if (counter.getValue() > 0) {
src.infoStringsDisplayOrder.add(counterName);
String infoString = "[ "
+ AVG_TIME_PRE + printCounter(counter.getValue(), counter.getType()) + " , "
+ MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + " , "
+ MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType()) + " ]";
src.infoStrings.put(counterName, infoString);
}
counter.setValue(0); // value 0 will remove in removeZeroeCounter
} else {
if (rhsCounter.size() == 0) {
return;
}
for (Counter cnt : rhsCounter) {
counter.addValue(cnt);
}
}
}

public String toString() {
StringBuilder builder = new StringBuilder();
prettyPrint(builder, "");
return builder.toString();
}

public String getSimpleString() {
if (!this.enableSimplyProfile) {
return toString();
}
StringBuilder builder = new StringBuilder();
simpleProfile(0);
prettyPrint(builder, "");
return builder.toString();
}

private void printChildCounters(String prefix, String counterName, StringBuilder builder) {
Set<String> childCounterSet = childCounterMap.get(counterName);
if (childCounterSet == null) {
Expand Down Expand Up @@ -450,6 +642,10 @@ public void computeTimeInProfile() {
computeTimeInProfile(this.counterTotalTime.getValue());
}

public void setProfileLevel() {
this.enableSimplyProfile = ConnectContext.get().getSessionVariable().getEnableSimplyProfile();
}

private void computeTimeInProfile(long total) {
if (total == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
public static final String ENABLE_SIMPLY_PROFILE = "enable_simply_profile";
public static final String MAX_INSTANCE_NUM = "max_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
Expand Down Expand Up @@ -605,6 +606,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true)
public int parallelPipelineTaskNum = 0;

@VariableMgr.VarAttr(name = ENABLE_SIMPLY_PROFILE, fuzzy = true)
public boolean enableSimplyProfile = true;

@VariableMgr.VarAttr(name = MAX_INSTANCE_NUM)
public int maxInstanceNum = 64;

Expand Down Expand Up @@ -2619,5 +2623,8 @@ public void checkAnalyzeTimeFormat(String time) {
throw new UnsupportedOperationException("Expect format: HH:mm:ss");
}
}
}

public boolean getEnableSimplyProfile() {
return this.enableSimplyProfile;
}
}

0 comments on commit 23f01dd

Please sign in to comment.