Skip to content

Commit

Permalink
set rf wait time according to table row count and table type
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Nov 8, 2024
1 parent 854512f commit 5ce6154
Show file tree
Hide file tree
Showing 18 changed files with 87 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
Expand All @@ -73,6 +75,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -279,6 +282,8 @@ protected Plan planWithoutLock(
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
}

setRuntimeFilterWaitTimeByTableRowCountAndType();

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down Expand Up @@ -315,6 +320,45 @@ protected LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

/**
* compute rf wait time according to max table row count, if wait time is not default value
* olap:
* row < 1G: 1 sec
* 1G <= row < 10G: 5 sec
* 10G < row: 20 sec
* external:
* row < 1G: 5 sec
* 1G <= row < 10G: 10 sec
* 10G < row: 50 sec
*/
private void setRuntimeFilterWaitTimeByTableRowCountAndType() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs()
!= VariableMgr.getDefaultSessionVariable().getRuntimeFilterWaitTimeMs()) {
List<LogicalCatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalCatalogRelation.class::isInstance);
double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext);
boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan));
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
if (hasExternalTable) {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "50000");
}
} else {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "1000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
}
}
}
}

private void initCascadesContext(LogicalPlan plan, PhysicalProperties requireProperties) {
cascadesContext = CascadesContext.initContext(statementContext, plan, requireProperties);
if (statementContext.getConnectContext().getTables() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.common.DdlException;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.hint.Hint;
Expand All @@ -36,6 +35,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,11 +55,9 @@ public Rule build() {
if (hintName.equalsIgnoreCase("SET_VAR")) {
((SelectHintSetVar) hint).setVarOnceInSql(ctx.statementContext);
} else if (hintName.equalsIgnoreCase("ORDERED")) {
try {
ctx.cascadesContext.getConnectContext().getSessionVariable()
.disableNereidsJoinReorderOnce();
} catch (DdlException e) {
throw new RuntimeException(e);
if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true")) {
throw new RuntimeException("set DISABLE_JOIN_REORDER=true once failed");
}
OrderedHint ordered = new OrderedHint("Ordered");
ordered.setStatus(Hint.HintStatus.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
Expand Down Expand Up @@ -137,6 +138,7 @@
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
Expand Down Expand Up @@ -220,6 +222,25 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
*
* get the max row count of tables used in a query
*/
public static double getMaxTableRowCount(List<LogicalCatalogRelation> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
double max = -1;
for (LogicalCatalogRelation scan : scans) {
double row;
if (scan instanceof LogicalOlapScan) {
row = calculator.getOlapTableRowCount((LogicalOlapScan) scan);
} else {
row = scan.getTable().getRowCount();
}
max = Math.max(row, max);
}
return max;
}

/**
* disable join reorder if
* 1. any table rowCount is not available, or
Expand All @@ -246,7 +267,8 @@ public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRela
Optional<String> reason = calculator.checkNdvValidation((OlapScan) scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
ConnectContext.get().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true");
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
List<String> ctasCols = createTableInfo.getCtasColumns();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
if (ctasCols == null) {
// we should analyze the plan firstly to get the columns' name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
Expand Down
32 changes: 12 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4204,27 +4204,19 @@ public void setEnableStrictConsistencyDml(boolean value) {
this.enableStrictConsistencyDml = value;
}

public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new StringLiteral("false")));
}

public void disableConstantFoldingByBEOnce() throws DdlException {
if (!enableFoldConstantByBe) {
return;
/**
*
* @return true iff set success
*/
public boolean setVarOnce(String varName, String value) {
try {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(varName, new StringLiteral(value)));
return true;
} catch (DdlException e) {
LOG.warn("set onece {} = {} failed", varName, value);
return false;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, new StringLiteral("false")));
}

public void disableNereidsJoinReorderOnce() throws DdlException {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(SessionVariable.DISABLE_JOIN_REORDER, new StringLiteral("true")));
}

// return number of variables by given variable annotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3556,7 +3556,7 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
try {
try {
// disable shuffle for http stream (only 1 sink)
sessionVariable.disableStrictConsistencyDmlOnce();
sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false");
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
Expand Down
31 changes: 0 additions & 31 deletions tools/tpcds-tools/bin/run-tpcds-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,15 @@ fi
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpcds sf 1 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpcds sf 100 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpcds sf 1000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpcds sf 10000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -123,32 +119,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
rm -f "${backup_session_variables_file}"
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}='${v}';" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}")
}
clean_up() {
echo "restore session variables:"
cat "${backup_session_variables_file}"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};"
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCDS_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -205,5 +176,3 @@ done
echo "Total cold run time: ${cold_run_sum} ms"
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpcds queries.'

clean_up
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf10000.sql

This file was deleted.

27 changes: 0 additions & 27 deletions tools/tpch-tools/bin/run-tpch-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,12 @@ fi
TPCH_QUERIES_DIR="${CURDIR}/../queries"
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpch sf 1 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpch sf 100 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpch sf 1000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpch sf 10000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -120,26 +116,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}=${v};" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}")
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCH_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -197,7 +174,3 @@ echo "Total cold run time: ${cold_run_sum} ms"
# tpch 流水线依赖这个'Total hot run time'字符串
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpch queries.'

echo "Restore session variables"
run_sql "source ${backup_session_variables_file};"
rm -f "${backup_session_variables_file}"
Empty file.
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf10000.sql

This file was deleted.

0 comments on commit 5ce6154

Please sign in to comment.