Skip to content

Commit

Permalink
[feature](runtime filter) New session variable runtime_filter_wait_in…
Browse files Browse the repository at this point in the history
…finitely (apache#26888)

New session variable: runtime_filter_wait_infinitely. If set runtime_filter_wait_infinitely = true, consumer of rf will wait on receiving until query is timeout.
  • Loading branch information
zhiqiang-hhhh authored and seawinde committed Nov 14, 2023
1 parent 7381598 commit 3952e43
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 16 deletions.
18 changes: 9 additions & 9 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1185,17 +1185,17 @@ bool IRuntimeFilter::await() {
return true;
}

// NOTE: Wait infinitely will not make scan task wait really forever.
// Because BlockTaskSchedule will make it run when query is timedout.
bool IRuntimeFilter::wait_infinitely() const {
// bitmap filter is precise filter and only filter once, so it must be applied.
return _wait_infinitely ||
(_wrapper != nullptr && _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER);
}

bool IRuntimeFilter::is_ready_or_timeout() {
DCHECK(is_consumer());
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
: _state->execution_timeout() * 1000;
auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
: _state->runtime_filter_wait_time_ms();
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
if (!_enable_pipeline_exec) {
_rf_state = RuntimeFilterState::TIME_OUT;
Expand All @@ -1212,7 +1212,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
if (is_ready()) {
return true;
}
bool timeout = wait_times_ms <= ms_since_registration;
bool timeout = wait_infinitely() ? false : _rf_wait_time_ms <= ms_since_registration;
auto expected = RuntimeFilterState::NOT_READY;
if (timeout) {
if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::TIME_OUT,
Expand Down
25 changes: 20 additions & 5 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class IRuntimeFilter {
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
_wait_infinitely(_state->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(_state->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
Expand All @@ -236,6 +238,8 @@ class IRuntimeFilter {
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
_wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
Expand Down Expand Up @@ -388,13 +392,21 @@ class IRuntimeFilter {
}
}

int32_t wait_time_ms() {
auto runtime_filter_wait_time_ms = _state == nullptr
? _query_ctx->runtime_filter_wait_time_ms()
: _state->runtime_filter_wait_time_ms();
return runtime_filter_wait_time_ms;
// For pipelineX & Producer
int32_t wait_time_ms() const {
int32_t res = 0;
if (wait_infinitely()) {
res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout();
// Convert to ms
res *= 1000;
} else {
res = _rf_wait_time_ms;
}
return res;
}

bool wait_infinitely() const;

int64_t registration_time() const { return registration_time_; }

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
Expand Down Expand Up @@ -479,6 +491,9 @@ class IRuntimeFilter {

/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
/// runtime filter wait time will be ignored if wait_infinitly is true
const bool _wait_infinitely;
const int32_t _rf_wait_time_ms;

const bool _enable_pipeline_exec;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
_rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
_rpc_context->cntl.set_timeout_ms(wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();

Status serialize_status = serialize(&_rpc_context->request, &data, &len);
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ class QueryContext {
return _query_options.runtime_filter_wait_time_ms;
}

bool runtime_filter_wait_infinitely() const {
return _query_options.__isset.runtime_filter_wait_infinitely &&
_query_options.runtime_filter_wait_infinitely;
}

bool enable_pipeline_exec() const {
return _query_options.__isset.enable_pipeline_engine &&
_query_options.enable_pipeline_engine;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ class RuntimeState {
return _query_options.runtime_filter_wait_time_ms;
}

bool runtime_filter_wait_infinitely() const {
return _query_options.__isset.runtime_filter_wait_infinitely &&
_query_options.runtime_filter_wait_infinitely;
}

int32_t runtime_filter_max_in_num() const { return _query_options.runtime_filter_max_in_num; }

int be_exec_version() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ For query options related to Runtime Filter, please refer to the following secti

- `runtime_filter_mode`: Used to adjust the push-down strategy of Runtime Filter, including three strategies of OFF, LOCAL, and GLOBAL. The default setting is the GLOBAL strategy

- `runtime_filter_wait_time_ms`: the time that ScanNode in the left table waits for each Runtime Filter, the default is 1000ms
- `runtime_filter_wait_time_ms`: The time that ScanNode in the left table waits for each Runtime Filter, the default is 1000ms

- `runtime_filters_max_num`: The maximum number of Bloom Filters in the Runtime Filter that can be applied to each query, the default is 10

Expand All @@ -107,6 +107,8 @@ For query options related to Runtime Filter, please refer to the following secti

- `runtime_filter_max_in_num`: If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, the default is 1024

- `runtime_filter_wait_infinitely`: If the parameter is true, the scan node of the left table will wait until it receives a runtime filter or the query times out, default is false

The query options are further explained below.

#### 1.runtime_filter_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量
- `runtime_bloom_filter_max_size`: Runtime Filter中Bloom Filter的最大长度,默认16777216(16M)
- `runtime_bloom_filter_size`: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M)
- `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024
- `runtime_filter_wait_infinitely`: 如果参数为 true,那么左表的scan节点将会一直等待直到接收到 runtime filer或者查询超超时,默认为false

下面对查询选项做进一步说明。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String USE_RF_DEFAULT = "use_rf_default";
// Time in ms to wait until runtime filters are delivered.
public static final String RUNTIME_FILTER_WAIT_TIME_MS = "runtime_filter_wait_time_ms";
public static final String runtime_filter_wait_infinitely = "runtime_filter_wait_infinitely";

// Maximum number of bloom runtime filters allowed per query
public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num";
// Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
Expand Down Expand Up @@ -792,6 +794,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = true)
private int runtimeFilterWaitTimeMs = 1000;

@VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = true)
private boolean runtimeFilterWaitInfinitely = false;

@VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM, needForward = true)
private int runtimeFiltersMaxNum = 10;

Expand Down Expand Up @@ -2486,6 +2491,7 @@ public TQueryOptions toThrift() {

tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely);

if (cpuResourceLimit > 0) {
TResourceLimit resourceLimit = new TResourceLimit();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ struct TQueryOptions {
89: optional bool enable_local_shuffle = false;
// For emergency use, skip missing version when reading rowsets
90: optional bool skip_missing_version = false;

91: optional bool runtime_filter_wait_infinitely = false;
}


Expand Down

0 comments on commit 3952e43

Please sign in to comment.