Skip to content

Commit

Permalink
[Refactor](RF) refactor the profile of rf and pipeline-x support loca…
Browse files Browse the repository at this point in the history
…l ignore (apache#31287)

* [Refactor](RF) refactor the profile of rf and pipeline-x support local ignore

* fix local merge filter
  • Loading branch information
HappenLee authored Feb 23, 2024
1 parent c5854a7 commit b27a40e
Show file tree
Hide file tree
Showing 20 changed files with 92 additions and 152 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {

if (master_info.__isset.backend_id) {
_master_info->__set_backend_id(master_info.backend_id);
BackendOptions::set_backend_id(master_info.backend_id);
}

if (master_info.__isset.frontend_infos) {
ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/heartbeat_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class HeartbeatServer : public HeartbeatServiceIf {
explicit HeartbeatServer(TMasterInfo* master_info);
~HeartbeatServer() override = default;

virtual void init_cluster_id();
void init_cluster_id();

// Master send heartbeat to this server
//
Expand Down
76 changes: 22 additions & 54 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,19 +462,6 @@ class RuntimePredicateWrapper {

switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
// only in filter can set ignore in merge time
if (_ignored) {
break;
} else if (wrapper->_ignored) {
VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id
<< ") because: " << wrapper->ignored_msg();

_ignored = true;
_ignored_msg = wrapper->_ignored_msg;
// release in filter
_context.hybrid_set.reset();
break;
}
// try insert set
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) {
Expand Down Expand Up @@ -1032,8 +1019,8 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo
pquery_id->set_lo(_state->query_id.lo());

auto pfragment_instance_id = merge_filter_request->mutable_fragment_instance_id();
pfragment_instance_id->set_hi(_state->fragment_instance_id().hi());
pfragment_instance_id->set_lo(_state->fragment_instance_id().lo());
pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id);
pfragment_instance_id->set_lo((int64_t)this);

merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_opt_remote_rf(opt_remote_rf);
Expand Down Expand Up @@ -1061,14 +1048,12 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
std::vector<vectorized::VExprSPtr>& push_exprs,
bool is_late_arrival) {
DCHECK(is_consumer());
if (_wrapper->is_ignored()) {
return Status::OK();
}
if (!is_late_arrival) {
_set_push_down();
if (!_wrapper->is_ignored()) {
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", _format_status());
return _wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr);
return Status::OK();
}

bool IRuntimeFilter::await() {
Expand Down Expand Up @@ -1202,9 +1187,9 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {

std::string IRuntimeFilter::_format_status() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, "
"[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}]",
_is_push_down, _get_explain_state_string(), _wrapper->is_ignored(), _has_remote_target,
_is_push_down, _get_explain_state_string(), _wrapper->ignored_msg(), _has_remote_target,
_has_local_target);
}

Expand Down Expand Up @@ -1323,11 +1308,7 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param,
}

void IRuntimeFilter::change_to_bloom_filter() {
auto origin_type = _wrapper->get_real_type();
_wrapper->change_to_bloom_filter();
if (origin_type != _wrapper->get_real_type()) {
update_runtime_filter_type_to_profile();
}
}

Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) {
Expand Down Expand Up @@ -1367,32 +1348,24 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool,
void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
if (_profile_init) {
parent_profile->add_child(_profile.get(), true, nullptr);
return;
}
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
} else {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
}
}

void IRuntimeFilter::update_runtime_filter_type_to_profile() {
if (_profile != nullptr) {
_profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type()));
}
_profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type()));
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
if (!_wrapper->is_ignored() && wrapper->is_ignored()) {
if (wrapper->is_ignored()) {
set_ignored(wrapper->ignored_msg());
} else if (!_wrapper->is_ignored()) {
return _wrapper->merge(wrapper);
}
auto origin_type = _wrapper->get_real_type();
Status status = _wrapper->merge(wrapper);
if (origin_type != _wrapper->get_real_type()) {
update_runtime_filter_type_to_profile();
}
return status;
return Status::OK();
}

template <typename T>
Expand Down Expand Up @@ -1695,12 +1668,10 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
const PInFilter in_filter = param->request->in_filter();
set_ignored(in_filter.ignored_msg());
}
std::unique_ptr<RuntimePredicateWrapper> wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper));
auto origin_type = _wrapper->get_real_type();
RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
if (origin_type != _wrapper->get_real_type()) {
} else {
std::unique_ptr<RuntimePredicateWrapper> wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper));
RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
update_runtime_filter_type_to_profile();
}
this->signal();
Expand All @@ -1718,11 +1689,8 @@ void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer
if (_wrapper->column_type() != wrapper->column_type()) {
wrapper->_column_return_type = _wrapper->_column_return_type;
}
auto origin_type = _wrapper->get_real_type();
_wrapper = wrapper;
if (origin_type != _wrapper->get_real_type()) {
update_runtime_filter_type_to_profile();
}
update_runtime_filter_type_to_profile();
this->signal();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class IRuntimeFilter {
static Status _create_wrapper(const T* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper);

void _set_push_down() { _is_push_down = true; }
void _set_push_down(bool push_down) { _is_push_down = push_down; }

std::string _format_status() const;

Expand Down
7 changes: 2 additions & 5 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ class VRuntimeFilterSlots {
std::map<int, bool> has_in_filter;

auto ignore_local_filter = [&](int filter_id) {
// Now pipeline x have bug in ignore, after fix the problem enable ignore logic in pipeline x
if (_need_local_merge) {
return Status::OK();
}
auto runtime_filter_mgr = state->local_runtime_filter_mgr();
auto runtime_filter_mgr = _need_local_merge ? state->global_runtime_filter_mgr()
: state->local_runtime_filter_mgr();

std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters));
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ Status ScanLocalState<Derived>::_normalize_predicate(
auto impl = conjunct_expr_root->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto cur_expr = impl ? impl.get() : conjunct_expr_root.get();
bool _is_runtime_filter_predicate =
_rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end();
bool _is_runtime_filter_predicate = _rf_vexpr_set.contains(conjunct_expr_root);
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range = nullptr;
vectorized::VScanNode::PushDownType pdt =
Expand Down
9 changes: 0 additions & 9 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

void close_a_pipeline();

void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}

virtual void add_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}

Expand Down Expand Up @@ -191,10 +186,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

std::shared_ptr<QueryContext> _query_ctx;

// This shared ptr is never used. It is just a reference to hold the object.
// There is a weak ptr in runtime filter manager to reference this object.
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;

MonotonicStopWatch _fragment_watcher;
RuntimeProfile::Counter* _start_timer = nullptr;
RuntimeProfile::Counter* _prepare_timer = nullptr;
Expand Down
43 changes: 23 additions & 20 deletions be/src/pipeline/pipeline_x/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void RuntimeFilterTimer::call_timeout() {
}
_call_timeout = true;
if (_parent) {
_parent->sub_filters();
_parent->sub_filters(_filter_id);
}
}

Expand All @@ -137,7 +137,7 @@ void RuntimeFilterTimer::call_ready() {
}
_call_ready = true;
if (_parent) {
_parent->sub_filters();
_parent->sub_filters(_filter_id);
}
_is_ready = true;
}
Expand All @@ -146,40 +146,43 @@ void RuntimeFilterTimer::call_has_ready() {
std::unique_lock<std::mutex> lc(_lock);
DCHECK(!_call_timeout);
if (!_call_ready) {
_parent->sub_filters();
_parent->sub_filters(_filter_id);
}
}

void RuntimeFilterTimer::call_has_release() {
// When the use count is equal to 1, only the timer queue still holds ownership,
// so there is no need to take any action.
}

void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
const auto filter_id = runtime_filter->filter_id();
;
_filters++;
_filter_ready_map[filter_id] = false;
int64_t registration_time = runtime_filter->registration_time();
int32 wait_time_ms = runtime_filter->wait_time_ms();
auto filter_timer = std::make_shared<RuntimeFilterTimer>(
registration_time, wait_time_ms,
filter_id, registration_time, wait_time_ms,
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
runtime_filter->set_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}

void RuntimeFilterDependency::sub_filters() {
auto value = _filters.fetch_sub(1);
if (value == 1) {
_watcher.stop();
std::vector<PipelineXTask*> local_block_task {};
{
std::unique_lock<std::mutex> lc(_task_lock);
*_blocked_by_rf = false;
local_block_task.swap(_blocked_task);
void RuntimeFilterDependency::sub_filters(int id) {
std::vector<PipelineXTask*> local_block_task {};
{
std::lock_guard<std::mutex> lk(_task_lock);
if (!_filter_ready_map[id]) {
_filter_ready_map[id] = true;
_filters--;
}
for (auto* task : local_block_task) {
task->wake_up();
if (_filters == 0) {
_watcher.stop();
{
*_blocked_by_rf = false;
local_block_task.swap(_blocked_task);
}
}
}
for (auto* task : local_block_task) {
task->wake_up();
}
}

void LocalExchangeSharedState::sub_running_sink_operators() {
Expand Down
13 changes: 9 additions & 4 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ struct FinishDependency final : public Dependency {
class RuntimeFilterDependency;
class RuntimeFilterTimer {
public:
RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms,
std::shared_ptr<RuntimeFilterDependency> parent)
: _parent(std::move(parent)),
: _filter_id(filter_id),
_parent(std::move(parent)),
_registration_time(registration_time),
_wait_time_ms(wait_time_ms) {}

Expand All @@ -215,14 +216,17 @@ class RuntimeFilterTimer {

void call_has_ready();

void call_has_release();
// When the use count is equal to 1, only the timer queue still holds ownership,
// so there is no need to take any action.
void call_has_release() {};

bool has_ready();

int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }

private:
int _filter_id = -1;
bool _call_ready {};
bool _call_timeout {};
std::shared_ptr<RuntimeFilterDependency> _parent;
Expand Down Expand Up @@ -303,7 +307,7 @@ class RuntimeFilterDependency final : public Dependency {
: Dependency(id, node_id, name, query_ctx) {}
Dependency* is_blocked_by(PipelineXTask* task) override;
void add_filters(IRuntimeFilter* runtime_filter);
void sub_filters();
void sub_filters(int id);
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
_blocked_by_rf = blocked_by_rf;
}
Expand All @@ -312,6 +316,7 @@ class RuntimeFilterDependency final : public Dependency {

protected:
std::atomic_int _filters;
phmap::flat_hash_map<int, bool> _filter_ready_map;
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
};

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
filterparams->query_id.set_hi(_runtime_state->query_id().hi);
filterparams->query_id.set_lo(_runtime_state->query_id().lo);

filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi);
filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo);
filterparams->be_exec_version = _runtime_state->be_exec_version();
filterparams->query_ctx = _query_ctx.get();
}
Expand Down
Loading

0 comments on commit b27a40e

Please sign in to comment.