Skip to content

Commit

Permalink
set rf wait time according to table row count and table type
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Oct 29, 2024
1 parent e238a87 commit 5a4772a
Show file tree
Hide file tree
Showing 14 changed files with 64 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
Expand All @@ -73,6 +74,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -228,6 +230,44 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
}
}

/**
* compute rf wait time according to max table row count, if wait time is not default value
* olap:
* row < 1G: 1 sec
* 1G <= row < 10G: 5 sec
* 10G < row: 20 sec
* external:
* row < 1G: 5 sec
* 1G <= row < 10G: 10 sec
* 10G < row: 50 sec
*/
private void setRuntimeFilterWaitTimeByTableRowCountAndType() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs()
!= VariableMgr.getDefaultSessionVariable().runtimeFilterWaitTimeMs) {
List<LogicalCatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalCatalogRelation.class::isInstance);
double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext);
boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan));
if (hasExternalTable) {
if (maxRow < 1_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000;
} else if (maxRow < 10_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000;
} else {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 50000;
}
} else {
if (maxRow < 1_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 1000;
} else if (maxRow < 10_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000;
} else {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000;
}
}
}
}

private Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
Expand Down Expand Up @@ -279,6 +319,8 @@ private Plan planWithoutLock(
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
}

setRuntimeFilterWaitTimeByTableRowCountAndType();

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
Expand Down Expand Up @@ -220,6 +221,25 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
*
* get the max row count of tables used in a query
*/
public static double getMaxTableRowCount(List<LogicalCatalogRelation> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
double max = -1;
for (LogicalCatalogRelation scan : scans) {
double row;
if (scan instanceof LogicalOlapScan) {
row = calculator.getOlapTableRowCount((LogicalOlapScan) scan);
} else {
row = scan.getTable().getRowCount();
}
max = Math.max(row, max);
}
return max;
}

/**
* disable join reorder if
* 1. any table rowCount is not available, or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ public class SessionVariable implements Serializable, Writable {
private int runtimeBloomFilterMaxSize = 16777216;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = true)
private int runtimeFilterWaitTimeMs = 1000;
public int runtimeFilterWaitTimeMs = 1000;

@VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = true)
private boolean runtimeFilterWaitInfinitely = false;
Expand Down
31 changes: 0 additions & 31 deletions tools/tpcds-tools/bin/run-tpcds-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,15 @@ fi
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpcds sf 1 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpcds sf 100 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpcds sf 1000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpcds sf 10000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -123,32 +119,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
rm -f "${backup_session_variables_file}"
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}='${v}';" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}")
}
clean_up() {
echo "restore session variables:"
cat "${backup_session_variables_file}"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};"
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCDS_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -205,5 +176,3 @@ done
echo "Total cold run time: ${cold_run_sum} ms"
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpcds queries.'

clean_up
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf10000.sql

This file was deleted.

29 changes: 1 addition & 28 deletions tools/tpch-tools/bin/run-tpch-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,12 @@ fi
TPCH_QUERIES_DIR="${CURDIR}/../queries"
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpch sf 1 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpch sf 100 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpch sf 1000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpch sf 10000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -120,26 +116,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}=${v};" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}")
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCH_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -196,8 +173,4 @@ done
echo "Total cold run time: ${cold_run_sum} ms"
# tpch 流水线依赖这个'Total hot run time'字符串
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpch queries.'

echo "Restore session variables"
run_sql "source ${backup_session_variables_file};"
rm -f "${backup_session_variables_file}"
echo 'Finish tpch queries.'
Empty file.
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf10000.sql

This file was deleted.

0 comments on commit 5a4772a

Please sign in to comment.