Skip to content

Commit

Permalink
[opt](nereids) hbo fix pb
Browse files Browse the repository at this point in the history
  • Loading branch information
xzj7019 committed Dec 9, 2024
1 parent b1c0836 commit 76cb760
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.nereids.stats.StatsCalculator;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
Expand Down Expand Up @@ -420,9 +421,8 @@ private void collectExecStatsIds(PhysicalPlan root, PlanFragment fragment) {
for (Object child : root.children()) {
collectExecStatsIds((PhysicalPlan) child, fragment);
}
if (root.needCollectExecStats()) {
// todo: make sure the plan id is valid
int nodeId = ((PlanNode) root).getId().asInt();
if (root.needCollectExecStats() && root instanceof AbstractPlan) {
int nodeId = ((AbstractPlan) root).getId();
fragment.getCollectExecStatsIds().add(nodeId);
cascadesContext.getNeedStatsPlanIdNodeMap().put(nodeId, root);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeWithHash;
import org.apache.doris.plugin.AuditEvent;
Expand Down Expand Up @@ -63,11 +68,10 @@ public HistoryBasedPlanStatisticsCalculator(GroupExpression groupExpression, boo
Map<CTEId, Statistics> cteIdToStats, CascadesContext context) {
super(groupExpression, forbidUnknownColStats, columnStatisticMap, isPlayNereidsDump,
cteIdToStats, context);
WorkloadRuntimeStatusMgr mgr = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr();
List<AuditEvent> auditEventList = mgr.getQueryNeedAudit();
// TODO: choose which audit event
AuditEvent event = auditEventList.get(0);
this.queryId = event.queryId;
//WorkloadRuntimeStatusMgr mgr = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr();
//List<AuditEvent> auditEventList = mgr.getQueryNeedAudit();
//AuditEvent event = auditEventList.get(0);
this.queryId = "123";//event.queryId;
this.historyBasedPlanStatisticsProvider = requireNonNull(context.getStatementContext().getHistoryBasedPlanStatisticsTracker().getHistoryBasedPlanStatisticsProvider(),
"historyBasedPlanStatisticsProvider is null");
this.historyBasedStatisticsCacheManager = requireNonNull(context.getStatementContext().getHistoryBasedPlanStatisticsTracker().getHistoryBasedStatisticsCacheManager(),
Expand All @@ -82,44 +86,57 @@ public void estimate() {
@Override
protected Statistics computeFilter(Filter filter) {
Statistics childStats = groupExpression.childStatistics(0);
return getHistoricalStatistics((PlanNode) filter, childStats);
return getHistoricalStatistics((AbstractPlan) filter, childStats);
}

@Override
protected Statistics computeJoin(Join join) {
Statistics legacyStats = JoinEstimation.estimate(
groupExpression.childStatistics(0),
groupExpression.childStatistics(1), join);
return getHistoricalStatistics((PlanNode) join, legacyStats);
return getHistoricalStatistics((AbstractPlan) join, legacyStats);
}

@Override
protected Statistics computeAggregate(Aggregate<? extends Plan> aggregate) {
Statistics childStats = groupExpression.childStatistics(0);
return getHistoricalStatistics((PlanNode) aggregate, childStats);
return getHistoricalStatistics((AbstractPlan) aggregate, childStats);
}

private Statistics getHistoricalStatistics(PlanNode planNode, Statistics delegateStats)
private Statistics getHistoricalStatistics(AbstractPlan planNode, Statistics delegateStats)
{
String hash = "test"; // todo: based on plan toString
PlanNodeWithHash planNodeWithHash = new PlanNodeWithHash((PlanNode) planNode, Optional.of(hash));
HistoricalPlanStatistics planStatistics = historyBasedStatisticsCacheManager
boolean isEnableHbo = false;
if (!isEnableHbo) {
return delegateStats;
} else {
String hash;
if (planNode instanceof AbstractPhysicalPlan) {
hash = planNode.hboTreeString();
} else if (planNode instanceof AbstractLogicalPlan) {
hash = planNode.hboTreeString();
} else {
throw new IllegalStateException("hbo get neither physical plan nor logical plan");
}
PlanNodeWithHash planNodeWithHash = new PlanNodeWithHash(null, Optional.of(hash));
HistoricalPlanStatistics planStatistics = historyBasedStatisticsCacheManager
.getStatisticsCache(queryId, historyBasedPlanStatisticsProvider, 1000)
.getUnchecked(planNodeWithHash);

Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(planNode, true);
// TODO: get current inputTableStatistics
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry
(planStatistics, inputTableStatistics.get(), 0.1);
if (historicalPlanStatisticsEntry.isPresent()) {
PlanStatistics predictedPlanStatistics = historicalPlanStatisticsEntry.get().getPlanStatistics();
// todo: choose which one is the output rows count
delegateStats.withRowCountAndEnforceValid(predictedPlanStatistics.getPushRows());
Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(planNode, true);
// TODO: get current inputTableStatistics
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry
= getSelectedHistoricalPlanStatisticsEntry
(planStatistics, inputTableStatistics.get(), 0.1);
if (historicalPlanStatisticsEntry.isPresent()) {
PlanStatistics predictedPlanStatistics = historicalPlanStatisticsEntry.get().getPlanStatistics();
// todo: choose which one is the output rows count
delegateStats.withRowCountAndEnforceValid(predictedPlanStatistics.getPushRows());
}
return delegateStats;
}
return delegateStats;
}

private Optional<List<PlanStatistics>> getPlanNodeInputTableStatistics(PlanNode plan, boolean cacheOnly)
private Optional<List<PlanStatistics>> getPlanNodeInputTableStatistics(AbstractPlan plan, boolean cacheOnly)
{
return Optional.empty(); //getInputTableStatistics(plan, cacheOnly);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ public boolean canBind() {
return true;
}

public String toHboString() {
return "NOT implemented";
}

public String hboTreeString() {
StringBuilder builder = new StringBuilder();
builder.append(this.toHboString());
if (this.children().isEmpty()) {
return builder.toString();
} else {
for (Plan plan : children) {
builder.append(plan.toString());
}
return builder.toString();
}
}

/**
* Get tree like string describing query plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ public Set<Slot> getConditionSlot() {
.flatMap(expr -> expr.getInputSlots().stream()).collect(ImmutableSet.toImmutableSet());
}

@Override
public String toHboString() {
List<Object> args = Lists.newArrayList(
"type", joinType,
"hashCondition", hashJoinConjuncts,
"otherCondition", otherJoinConjuncts,
"markCondition", markJoinConjuncts);
return Utils.toSqlString(this.getClass().getSimpleName() + getGroupIdWithPrefix(),
args.toArray());
}

@Override
public String toString() {
List<Object> args = Lists.newArrayList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public String toString() {
);
}

@Override
public String toHboString() {
return Utils.toSqlString("PhysicalFilter" + getGroupIdWithPrefix(),
"predicates", getPredicate()
);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ public List<Slot> getBaseOutputs() {
return baseOutputs;
}

@Override
public String toHboString() {
StringBuilder builder = new StringBuilder();
if (!getAppliedRuntimeFilters().isEmpty()) {
getAppliedRuntimeFilters().forEach(rf -> builder.append(" RF").append(rf.getId().asInt()));
}
String partitions = "";
int partitionCount = this.table.getPartitionNames().size();
if (selectedPartitionIds.size() != partitionCount) {
partitions = " partitions(" + selectedPartitionIds.size() + "/" + partitionCount + ")";
}
return Utils.toSqlString("PhysicalOlapScan[" + table.getName() + partitions + "]"
+ getGroupIdWithPrefix(),"RFs", builder
);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public List<NamedExpression> getProjects() {
return projects;
}

@Override
public String toHboString() {
return Utils.toSqlString("PhysicalProject[" + "]" + getGroupIdWithPrefix(),
"projects", projects
);
}

@Override
public String toString() {
StringBuilder cse = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
this.transferQueryStatisticsWithEveryBatch = false;
this.builderRuntimeFilterIds = new HashSet<>();
this.targetRuntimeFilterIds = new HashSet<>();
this.collectExecStatsIds = new ArrayList<>();
this.hasBucketShuffleJoin = buildHasBucketShuffleJoin();
setParallelExecNumIfExists();
setFragmentInPlanTree(planRoot);
Expand Down

0 comments on commit 76cb760

Please sign in to comment.