diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 7ea92ee73b384f4..6cb3414820b473a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -58,6 +58,7 @@ import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner; import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; @@ -73,6 +74,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.VariableMgr; import org.apache.doris.thrift.TQueryCacheParam; import com.google.common.annotations.VisibleForTesting; @@ -228,6 +230,44 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, } } + /** + * compute rf wait time according to max table row count, if wait time is not default value + * olap: + * row < 1G: 1 sec + * 1G <= row < 10G: 5 sec + * 10G < row: 20 sec + * external: + * row < 1G: 5 sec + * 1G <= row < 10G: 10 sec + * 10G < row: 50 sec + */ + private void setRuntimeFilterWaitTimeByTableRowCountAndType() { + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs() + != VariableMgr.getDefaultSessionVariable().runtimeFilterWaitTimeMs) { + List scans = cascadesContext.getRewritePlan() + .collectToList(LogicalCatalogRelation.class::isInstance); + double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext); + boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan)); + if (hasExternalTable) { + if (maxRow < 1_000_000_000L) { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000; + } else if (maxRow < 10_000_000_000L) { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000; + } else { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 50000; + } + } else { + if (maxRow < 1_000_000_000L) { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 1000; + } else if (maxRow < 10_000_000_000L) { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 5000; + } else { + ConnectContext.get().getSessionVariable().runtimeFilterWaitTimeMs = 20000; + } + } + } + } + private Plan planWithoutLock( LogicalPlan plan, ExplainLevel explainLevel, boolean showPlanProcess, PhysicalProperties requireProperties) { @@ -279,6 +319,8 @@ private Plan planWithoutLock( disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason); } + setRuntimeFilterWaitTimeByTableRowCountAndType(); + optimize(); if (statementContext.getConnectContext().getExecutor() != null) { statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index d4a9a81fc07c1d2..b3702488b1e79fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -72,6 +72,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; @@ -220,6 +221,25 @@ public Map getTotalColumnStatisticMap() { return totalColumnStatisticMap; } + /** + * + * get the max row count of tables used in a query + */ + public static double getMaxTableRowCount(List scans, CascadesContext context) { + StatsCalculator calculator = new StatsCalculator(context); + double max = -1; + for (LogicalCatalogRelation scan : scans) { + double row; + if (scan instanceof LogicalOlapScan) { + row = calculator.getOlapTableRowCount((LogicalOlapScan) scan); + } else { + row = scan.getTable().getRowCount(); + } + max = Math.max(row, max); + } + return max; + } + /** * disable join reorder if * 1. any table rowCount is not available, or diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index cc1f29b76c2b491..e6d79e47dfc7002 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1136,7 +1136,7 @@ public class SessionVariable implements Serializable, Writable { private int runtimeBloomFilterMaxSize = 16777216; @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = true) - private int runtimeFilterWaitTimeMs = 1000; + public int runtimeFilterWaitTimeMs = 1000; @VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = true) private boolean runtimeFilterWaitInfinitely = false; diff --git a/tools/tpcds-tools/bin/run-tpcds-queries.sh b/tools/tpcds-tools/bin/run-tpcds-queries.sh index 8669ba8073ad2d5..ea2b3dd92a2f106 100755 --- a/tools/tpcds-tools/bin/run-tpcds-queries.sh +++ b/tools/tpcds-tools/bin/run-tpcds-queries.sh @@ -81,19 +81,15 @@ fi if [[ ${SCALE_FACTOR} -eq 1 ]]; then echo "Running tpcds sf 1 queries" TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1" - TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql" elif [[ ${SCALE_FACTOR} -eq 100 ]]; then echo "Running tpcds sf 100 queries" TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100" - TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql" elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then echo "Running tpcds sf 1000 queries" TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000" - TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql" elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then echo "Running tpcds sf 10000 queries" TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000" - TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql" else echo "${SCALE_FACTOR} scale is NOT support currently." exit 1 @@ -123,32 +119,7 @@ run_sql() { echo "$*" mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*" } -get_session_variable() { - k="$1" - v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ") - echo "${v/*Value: /}" -} -backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql" -backup_session_variables() { - rm -f "${backup_session_variables_file}" - touch "${backup_session_variables_file}" - while IFS= read -r line; do - k="${line/set global /}" - k="${k%=*}" - v=$(get_session_variable "${k}") - echo "set global ${k}='${v}';" >>"${backup_session_variables_file}" - done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}") -} -clean_up() { - echo "restore session variables:" - cat "${backup_session_variables_file}" - mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};" -} -backup_session_variables -echo '============================================' -echo "Optimize session variables" -run_sql "source ${TPCDS_OPT_CONF};" echo '============================================' run_sql "show variables;" echo '============================================' @@ -205,5 +176,3 @@ done echo "Total cold run time: ${cold_run_sum} ms" echo "Total hot run time: ${best_hot_run_sum} ms" echo 'Finish tpcds queries.' - -clean_up diff --git a/tools/tpcds-tools/conf/opt/opt_sf1.sql b/tools/tpcds-tools/conf/opt/opt_sf1.sql deleted file mode 100644 index e69de29bb2d1d64..000000000000000 diff --git a/tools/tpcds-tools/conf/opt/opt_sf100.sql b/tools/tpcds-tools/conf/opt/opt_sf100.sql deleted file mode 100644 index e69de29bb2d1d64..000000000000000 diff --git a/tools/tpcds-tools/conf/opt/opt_sf1000.sql b/tools/tpcds-tools/conf/opt/opt_sf1000.sql deleted file mode 100644 index 17d8fa190f37758..000000000000000 --- a/tools/tpcds-tools/conf/opt/opt_sf1000.sql +++ /dev/null @@ -1 +0,0 @@ -set global runtime_filter_wait_time_ms=10000; diff --git a/tools/tpcds-tools/conf/opt/opt_sf10000.sql b/tools/tpcds-tools/conf/opt/opt_sf10000.sql deleted file mode 100644 index ef11bad93def79b..000000000000000 --- a/tools/tpcds-tools/conf/opt/opt_sf10000.sql +++ /dev/null @@ -1 +0,0 @@ -set global runtime_filter_wait_time_ms=100000; diff --git a/tools/tpch-tools/bin/run-tpch-queries.sh b/tools/tpch-tools/bin/run-tpch-queries.sh index ee5215655ef96d5..55469d613911202 100755 --- a/tools/tpch-tools/bin/run-tpch-queries.sh +++ b/tools/tpch-tools/bin/run-tpch-queries.sh @@ -81,16 +81,12 @@ fi TPCH_QUERIES_DIR="${CURDIR}/../queries" if [[ ${SCALE_FACTOR} -eq 1 ]]; then echo "Running tpch sf 1 queries" - TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql" elif [[ ${SCALE_FACTOR} -eq 100 ]]; then echo "Running tpch sf 100 queries" - TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql" elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then echo "Running tpch sf 1000 queries" - TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql" elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then echo "Running tpch sf 10000 queries" - TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql" else echo "${SCALE_FACTOR} scale is NOT support currently." exit 1 @@ -120,26 +116,7 @@ run_sql() { echo "$*" mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*" } -get_session_variable() { - k="$1" - v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ") - echo "${v/*Value: /}" -} -backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql" -backup_session_variables() { - touch "${backup_session_variables_file}" - while IFS= read -r line; do - k="${line/set global /}" - k="${k%=*}" - v=$(get_session_variable "${k}") - echo "set global ${k}=${v};" >>"${backup_session_variables_file}" - done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}") -} -backup_session_variables -echo '============================================' -echo "Optimize session variables" -run_sql "source ${TPCH_OPT_CONF};" echo '============================================' run_sql "show variables;" echo '============================================' @@ -196,8 +173,4 @@ done echo "Total cold run time: ${cold_run_sum} ms" # tpch 流水线依赖这个'Total hot run time'字符串 echo "Total hot run time: ${best_hot_run_sum} ms" -echo 'Finish tpch queries.' - -echo "Restore session variables" -run_sql "source ${backup_session_variables_file};" -rm -f "${backup_session_variables_file}" +echo 'Finish tpch queries.' \ No newline at end of file diff --git a/tools/tpch-tools/conf/opt/backup_session_variables.sql b/tools/tpch-tools/conf/opt/backup_session_variables.sql deleted file mode 100644 index e69de29bb2d1d64..000000000000000 diff --git a/tools/tpch-tools/conf/opt/opt_sf1.sql b/tools/tpch-tools/conf/opt/opt_sf1.sql deleted file mode 100644 index e69de29bb2d1d64..000000000000000 diff --git a/tools/tpch-tools/conf/opt/opt_sf100.sql b/tools/tpch-tools/conf/opt/opt_sf100.sql deleted file mode 100644 index e69de29bb2d1d64..000000000000000 diff --git a/tools/tpch-tools/conf/opt/opt_sf1000.sql b/tools/tpch-tools/conf/opt/opt_sf1000.sql deleted file mode 100644 index 17d8fa190f37758..000000000000000 --- a/tools/tpch-tools/conf/opt/opt_sf1000.sql +++ /dev/null @@ -1 +0,0 @@ -set global runtime_filter_wait_time_ms=10000; diff --git a/tools/tpch-tools/conf/opt/opt_sf10000.sql b/tools/tpch-tools/conf/opt/opt_sf10000.sql deleted file mode 100644 index ef11bad93def79b..000000000000000 --- a/tools/tpch-tools/conf/opt/opt_sf10000.sql +++ /dev/null @@ -1 +0,0 @@ -set global runtime_filter_wait_time_ms=100000;