Skip to content

Commit

Permalink
[feat](nereids)disable join reorder if any table row count is not ava…
Browse files Browse the repository at this point in the history
…ilable (apache#43000)

in previous PR apache#41790, if any Olap table row count is not available,
planner set disable_join_reorder true.
this pr make this rule apply to external table.

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
englefly committed Nov 15, 2024
1 parent f2c3201 commit 478f979
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
Expand Down Expand Up @@ -272,8 +272,8 @@ private Plan planWithoutLock(
// 2. ut test. In ut test, FeConstants.enableInternalSchemaDb is false or FeConstants.runningUnitTest is true
if (FeConstants.enableInternalSchemaDb && !FeConstants.runningUnitTest
&& !cascadesContext.isLeadingDisableJoinReorder()) {
List<LogicalOlapScan> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalOlapScan.class::isInstance);
List<CatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(CatalogRelation.class::isInstance);
Optional<String> disableJoinReorderReason = StatsCalculator
.disableJoinReorderIfStatsInvalid(scans, cascadesContext);
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,32 +217,34 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
* 2. col stats ndv=0 but minExpr or maxExpr is not null
* 3. ndv > 10 * rowCount
*/
public static Optional<String> disableJoinReorderIfStatsInvalid(List<LogicalOlapScan> scans,
public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRelation> scans,
CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
if (ConnectContext.get() == null) {
// ut case
return Optional.empty();
}
for (LogicalOlapScan scan : scans) {
double rowCount = calculator.getOlapTableRowCount(scan);
for (CatalogRelation scan : scans) {
double rowCount = calculator.getTableRowCount(scan);
// row count not available
if (rowCount == -1) {
LOG.info("disable join reorder since row count not available: "
+ scan.getTable().getNameWithFullQualifiers());
return Optional.of("table[" + scan.getTable().getName() + "] row count is invalid");
}
// ndv abnormal
Optional<String> reason = calculator.checkNdvValidation(scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
LOG.info("disableNereidsJoinReorderOnce failed");
if (scan instanceof OlapScan) {
// ndv abnormal
Optional<String> reason = calculator.checkNdvValidation((OlapScan) scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
LOG.info("disableNereidsJoinReorderOnce failed");
}
return reason;
}
return reason;
}
}
return Optional.empty();
Expand Down Expand Up @@ -398,6 +400,14 @@ private void checkIfUnknownStatsUsedAsKey(StatisticsBuilder builder) {
}
}

private double getTableRowCount(CatalogRelation scan) {
if (scan instanceof OlapScan) {
return getOlapTableRowCount((OlapScan) scan);
} else {
return scan.getTable().getRowCount();
}
}

/**
* if the table is not analyzed and BE does not report row count, return -1
*/
Expand Down Expand Up @@ -1082,11 +1092,12 @@ private ColumnStatistic getColumnStatistic(TableIf table, String colName, long i
private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
StatisticsBuilder builder = new StatisticsBuilder();

double tableRowCount = catalogRelation.getTable().getRowCount();
// for FeUt, use ColumnStatistic.UNKNOWN
if (!FeConstants.enableInternalSchemaDb
|| ConnectContext.get() == null
|| ConnectContext.get().getSessionVariable().internalSession) {
builder.setRowCount(catalogRelation.getTable().getRowCountForNereids());
builder.setRowCount(Math.max(1, tableRowCount));
for (Slot slot : catalogRelation.getOutput()) {
builder.putColumnStatistics(slot, ColumnStatistic.UNKNOWN);
}
Expand All @@ -1102,8 +1113,8 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
}
}
Set<SlotReference> slotSet = slotSetBuilder.build();
double tableRowCount = catalogRelation.getTable().getRowCount();
if (tableRowCount <= 0) {
tableRowCount = 1;
// try to get row count from col stats
for (SlotReference slot : slotSet) {
ColumnStatistic cache = getColumnStatsFromTableCache(catalogRelation, slot);
Expand Down

0 comments on commit 478f979

Please sign in to comment.