Skip to content

Commit

Permalink
[enhancement](pipeline) add bvar for pipeline fragment instance and task
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 15, 2023
1 parent 088bb80 commit 1a81b8a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
4 changes: 3 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
#include "vec/runtime/vdata_stream_mgr.h"

namespace doris::pipeline {
bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");

PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
Expand Down Expand Up @@ -370,7 +371,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
}

g_pipeline_tasks_count << _total_tasks;
for (auto& task : _tasks) {
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
}
Expand Down Expand Up @@ -889,6 +890,7 @@ void PipelineFragmentContext::_close_fragment_instance() {

void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
g_pipeline_tasks_count << -1;
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
bvar::Adder<int64_t> g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");

std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
Expand Down Expand Up @@ -587,6 +588,7 @@ void FragmentMgr::remove_pipeline_context(
LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id),
print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
if (all_done) {
LOG_INFO("Query {} finished", print_id(query_id));
Expand Down Expand Up @@ -960,6 +962,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
};

int target_size = params.local_params.size();
g_pipeline_fragment_instances_count << target_size;

if (target_size > 1) {
int prepare_done = {0};
Status prepare_status[target_size];
Expand Down

0 comments on commit 1a81b8a

Please sign in to comment.