Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 25, 2024
1 parent 5d6e0bc commit 26e3705
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,33 +801,30 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
query_ctx->set_merge_controller_handler(handler);
}

{
for (const auto& local_param : params.local_params) {
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
// (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
std::lock_guard<std::mutex> lock(_lock);
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})",
params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})",
params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);

if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
// TODO: simplify this mapping
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}

int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
{
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
// TODO: simplify this mapping
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}

query_ctx->set_pipeline_context(params.fragment_id, context);

RETURN_IF_ERROR(context->submit());
Expand Down

0 comments on commit 26e3705

Please sign in to comment.