From 3952e430eeddde305801c9a8ec02899b7b612f55 Mon Sep 17 00:00:00 2001 From: zhiqiang Date: Tue, 14 Nov 2023 07:05:59 -0600 Subject: [PATCH] [feature](runtime filter) New session variable runtime_filter_wait_infinitely (#26888) New session variable: runtime_filter_wait_infinitely. If set runtime_filter_wait_infinitely = true, consumer of rf will wait on receiving until query is timeout. --- be/src/exprs/runtime_filter.cpp | 18 ++++++------- be/src/exprs/runtime_filter.h | 25 +++++++++++++++---- be/src/exprs/runtime_filter_rpc.cpp | 2 +- be/src/runtime/query_context.h | 5 ++++ be/src/runtime/runtime_state.h | 5 ++++ .../join-optimization/runtime-filter.md | 4 ++- .../join-optimization/runtime-filter.md | 1 + .../org/apache/doris/qe/SessionVariable.java | 6 +++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ 9 files changed, 52 insertions(+), 16 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 10c2856112b116d..1c7cb4b5c13b046 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1185,17 +1185,17 @@ bool IRuntimeFilter::await() { return true; } +// NOTE: Wait infinitely will not make scan task wait really forever. +// Because BlockTaskSchedule will make it run when query is timedout. +bool IRuntimeFilter::wait_infinitely() const { + // bitmap filter is precise filter and only filter once, so it must be applied. + return _wait_infinitely || + (_wrapper != nullptr && _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER); +} + bool IRuntimeFilter::is_ready_or_timeout() { DCHECK(is_consumer()); auto cur_state = _rf_state_atomic.load(std::memory_order_acquire); - auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000 - : _state->execution_timeout() * 1000; - auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms() - : _state->runtime_filter_wait_time_ms(); - // bitmap filter is precise filter and only filter once, so it must be applied. - int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER - ? execution_timeout - : runtime_filter_wait_time_ms; int64_t ms_since_registration = MonotonicMillis() - registration_time_; if (!_enable_pipeline_exec) { _rf_state = RuntimeFilterState::TIME_OUT; @@ -1212,7 +1212,7 @@ bool IRuntimeFilter::is_ready_or_timeout() { if (is_ready()) { return true; } - bool timeout = wait_times_ms <= ms_since_registration; + bool timeout = wait_infinitely() ? false : _rf_wait_time_ms <= ms_since_registration; auto expected = RuntimeFilterState::NOT_READY; if (timeout) { if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::TIME_OUT, diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index f13877b869cec4d..7a65706f5ad8513 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -211,6 +211,8 @@ class IRuntimeFilter { _always_true(false), _is_ignored(false), registration_time_(MonotonicMillis()), + _wait_infinitely(_state->runtime_filter_wait_infinitely()), + _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()), _enable_pipeline_exec(_state->enable_pipeline_exec()), _profile(new RuntimeProfile(_name)) { if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) { @@ -236,6 +238,8 @@ class IRuntimeFilter { _always_true(false), _is_ignored(false), registration_time_(MonotonicMillis()), + _wait_infinitely(query_ctx->runtime_filter_wait_infinitely()), + _rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()), _enable_pipeline_exec(query_ctx->enable_pipeline_exec()), _profile(new RuntimeProfile(_name)) { if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) { @@ -388,13 +392,21 @@ class IRuntimeFilter { } } - int32_t wait_time_ms() { - auto runtime_filter_wait_time_ms = _state == nullptr - ? _query_ctx->runtime_filter_wait_time_ms() - : _state->runtime_filter_wait_time_ms(); - return runtime_filter_wait_time_ms; + // For pipelineX & Producer + int32_t wait_time_ms() const { + int32_t res = 0; + if (wait_infinitely()) { + res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout(); + // Convert to ms + res *= 1000; + } else { + res = _rf_wait_time_ms; + } + return res; } + bool wait_infinitely() const; + int64_t registration_time() const { return registration_time_; } void set_filter_timer(std::shared_ptr); @@ -479,6 +491,9 @@ class IRuntimeFilter { /// Time in ms (from MonotonicMillis()), that the filter was registered. const int64_t registration_time_; + /// runtime filter wait time will be ignored if wait_infinitly is true + const bool _wait_infinitely; + const int32_t _rf_wait_time_ms; const bool _enable_pipeline_exec; diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index 2220ca86060a2d3..00540b8382c6672 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -74,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress _rpc_context->request.set_filter_id(_filter_id); _rpc_context->request.set_opt_remote_rf(opt_remote_rf); _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec()); - _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms()); + _rpc_context->cntl.set_timeout_ms(wait_time_ms()); _rpc_context->cid = _rpc_context->cntl.call_id(); Status serialize_status = serialize(&_rpc_context->request, &data, &len); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index c51e92d14c83bd4..bb7ad20b90bd437 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -210,6 +210,11 @@ class QueryContext { return _query_options.runtime_filter_wait_time_ms; } + bool runtime_filter_wait_infinitely() const { + return _query_options.__isset.runtime_filter_wait_infinitely && + _query_options.runtime_filter_wait_infinitely; + } + bool enable_pipeline_exec() const { return _query_options.__isset.enable_pipeline_engine && _query_options.enable_pipeline_engine; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 3b420511fa012a1..835bd582894e1e9 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -333,6 +333,11 @@ class RuntimeState { return _query_options.runtime_filter_wait_time_ms; } + bool runtime_filter_wait_infinitely() const { + return _query_options.__isset.runtime_filter_wait_infinitely && + _query_options.runtime_filter_wait_infinitely; + } + int32_t runtime_filter_max_in_num() const { return _query_options.runtime_filter_max_in_num; } int be_exec_version() const { diff --git a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md index 7b359d5c08a2f26..a70522b02152906 100644 --- a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md +++ b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md @@ -95,7 +95,7 @@ For query options related to Runtime Filter, please refer to the following secti - `runtime_filter_mode`: Used to adjust the push-down strategy of Runtime Filter, including three strategies of OFF, LOCAL, and GLOBAL. The default setting is the GLOBAL strategy - - `runtime_filter_wait_time_ms`: the time that ScanNode in the left table waits for each Runtime Filter, the default is 1000ms + - `runtime_filter_wait_time_ms`: The time that ScanNode in the left table waits for each Runtime Filter, the default is 1000ms - `runtime_filters_max_num`: The maximum number of Bloom Filters in the Runtime Filter that can be applied to each query, the default is 10 @@ -107,6 +107,8 @@ For query options related to Runtime Filter, please refer to the following secti - `runtime_filter_max_in_num`: If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, the default is 1024 + - `runtime_filter_wait_infinitely`: If the parameter is true, the scan node of the left table will wait until it receives a runtime filter or the query times out, default is false + The query options are further explained below. #### 1.runtime_filter_type diff --git a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md index ff47091cf053dbf..8ed2876235abb8b 100644 --- a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md +++ b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md @@ -103,6 +103,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量 - `runtime_bloom_filter_max_size`: Runtime Filter中Bloom Filter的最大长度,默认16777216(16M) - `runtime_bloom_filter_size`: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M) - `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024 + - `runtime_filter_wait_infinitely`: 如果参数为 true,那么左表的scan节点将会一直等待直到接收到 runtime filer或者查询超超时,默认为false 下面对查询选项做进一步说明。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 711c5a2f016f0ef..3cc5bd455874c0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -167,6 +167,8 @@ public class SessionVariable implements Serializable, Writable { public static final String USE_RF_DEFAULT = "use_rf_default"; // Time in ms to wait until runtime filters are delivered. public static final String RUNTIME_FILTER_WAIT_TIME_MS = "runtime_filter_wait_time_ms"; + public static final String runtime_filter_wait_infinitely = "runtime_filter_wait_infinitely"; + // Maximum number of bloom runtime filters allowed per query public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num"; // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType @@ -792,6 +794,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = true) private int runtimeFilterWaitTimeMs = 1000; + @VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = true) + private boolean runtimeFilterWaitInfinitely = false; + @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM, needForward = true) private int runtimeFiltersMaxNum = 10; @@ -2486,6 +2491,7 @@ public TQueryOptions toThrift() { tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs); tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum); + tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely); if (cpuResourceLimit > 0) { TResourceLimit resourceLimit = new TResourceLimit(); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 7c55842735cdc91..d1a779e285a9161 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -255,6 +255,8 @@ struct TQueryOptions { 89: optional bool enable_local_shuffle = false; // For emergency use, skip missing version when reading rowsets 90: optional bool skip_missing_version = false; + + 91: optional bool runtime_filter_wait_infinitely = false; }