Skip to content

Commit

Permalink
adaptive rf wait time
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Oct 28, 2024
1 parent e238a87 commit db163b1
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 75 deletions.
22 changes: 4 additions & 18 deletions fe/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
Expand All @@ -73,6 +74,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -279,6 +281,39 @@ private Plan planWithoutLock(
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
}

// compute rf wait time according to max table row count, if wait time is not default value
// olap:
// row < 1G: 1 sec
// 1G <= row < 10G: 5 sec
// 10G < row: 20 sec
// external:
// row < 1G: 5 sec
// 1G <= row < 10G: 10 sec
// 10G < row: 50 sec
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs()
!= VariableMgr.getDefaultSessionVariable().runtimeFilterWaitTimeMs) {
List<LogicalCatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalCatalogRelation.class::isInstance);
double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext);
boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan));
if (hasExternalTable) {
if (maxRow < 1_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000;
} else if (maxRow < 10_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000;
} else {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 50000;
}
} else {
if (maxRow < 1_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 1000;
} else if (maxRow < 10_000_000_000L) {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000;
} else {
ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000;
}
}
}
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
Expand Down Expand Up @@ -220,6 +221,25 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
*
* get the max row count of tables used in a query
*/
public static double getMaxTableRowCount(List<LogicalCatalogRelation> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
double max = -1;
for (LogicalCatalogRelation scan : scans) {
double row;
if (scan instanceof LogicalOlapScan) {
row = calculator.getOlapTableRowCount((LogicalOlapScan) scan);
} else {
row = scan.getTable().getRowCount();
}
max = Math.max(row, max);
}
return max;
}

/**
* disable join reorder if
* 1. any table rowCount is not available, or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ public class SessionVariable implements Serializable, Writable {
private int runtimeBloomFilterMaxSize = 16777216;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = true)
private int runtimeFilterWaitTimeMs = 1000;
public int runtimeFilterWaitTimeMs = 1000;

@VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = true)
private boolean runtimeFilterWaitInfinitely = false;
Expand Down
29 changes: 0 additions & 29 deletions tools/tpcds-tools/bin/run-tpcds-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,15 @@ fi
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpcds sf 1 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpcds sf 100 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpcds sf 1000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpcds sf 10000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -123,32 +119,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
rm -f "${backup_session_variables_file}"
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}='${v}';" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}")
}
clean_up() {
echo "restore session variables:"
cat "${backup_session_variables_file}"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};"
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCDS_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf10000.sql

This file was deleted.

23 changes: 0 additions & 23 deletions tools/tpch-tools/bin/run-tpch-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,12 @@ fi
TPCH_QUERIES_DIR="${CURDIR}/../queries"
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpch sf 1 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpch sf 100 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpch sf 1000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpch sf 10000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -120,26 +116,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}=${v};" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}")
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCH_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down
Empty file.
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf10000.sql

This file was deleted.

0 comments on commit db163b1

Please sign in to comment.