Skip to content

Commit

Permalink
[enhancement](profilev2) add some fields for profile v2 (apache#25611)
Browse files Browse the repository at this point in the history
Add 3 counters for ExecNode:

ExecTime - Total execution time(excluding the execution time of children).
OutputBytes - The total number of bytes output to parent.
BlockCount - The total count of blocks output to parent.
  • Loading branch information
mrhhsg authored Oct 23, 2023
1 parent 5cb5121 commit b5ee4a9
Show file tree
Hide file tree
Showing 18 changed files with 68 additions and 3 deletions.
12 changes: 12 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ Status ExecNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != nullptr);
_span = state->get_tracer()->StartSpan(get_name());
OpentelemetryScope scope {_span};

_exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned", TUnit::UNIT, 1);
_output_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "OutputBytes", TUnit::BYTES, 1);
_block_count_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlockCount", TUnit::UNIT, 1);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
Expand Down Expand Up @@ -518,6 +523,7 @@ std::string ExecNode::get_name() {
}

Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
using namespace vectorized;
MutableBlock mutable_block =
Expand Down Expand Up @@ -551,6 +557,12 @@ Status ExecNode::get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& func,
bool clear_data) {
Defer defer([block, this]() {
if (block && !block->empty()) {
COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes());
COUNTER_UPDATE(_block_count_counter, 1);
}
});
if (_output_row_descriptor) {
if (clear_data) {
clear_origin_block();
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ class ExecNode {
// which will providea reference for operator memory.
std::unique_ptr<MemTracker> _mem_tracker;

RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _output_bytes_counter;
RuntimeProfile::Counter* _block_count_counter;
RuntimeProfile::Counter* _rows_returned_rate;
RuntimeProfile::Counter* _memory_used_counter;
RuntimeProfile::Counter* _projection_timer;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/distinct_vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const TPlanNo

Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());

Expand Down Expand Up @@ -82,6 +83,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
ColumnRawPtrs& key_columns,
const size_t num_rows) {
SCOPED_TIMER(_exec_timer);
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ void HashJoinNode::prepare_for_next() {
}

Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
// If we use a short-circuit strategy, should return empty block directly.
Expand Down Expand Up @@ -495,6 +496,7 @@ Status HashJoinNode::_filter_data_and_build_output(RuntimeState* state,
}

Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_block, bool eos) {
SCOPED_TIMER(_exec_timer);
_probe_eos = eos;
if (input_block->rows() > 0) {
COUNTER_UPDATE(_probe_rows_counter, input_block->rows());
Expand Down Expand Up @@ -670,6 +672,7 @@ Status HashJoinNode::open(RuntimeState* state) {
}

Status HashJoinNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
Expand Down Expand Up @@ -724,6 +727,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
}

Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);

// make one block for each 4 gigabytes
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
SCOPED_TIMER(_exec_timer);
_build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime");
_build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
_build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT);
Expand Down Expand Up @@ -201,6 +202,7 @@ Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
}

Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* block, bool eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
auto rows = block->rows();
auto mem_usage = block->allocated_bytes();
Expand Down Expand Up @@ -230,6 +232,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block*
}

Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* block, bool eos) {
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_probe_rows_counter, block->rows());
_cur_probe_row_visited_flags.resize(block->rows());
std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 0);
Expand Down Expand Up @@ -662,6 +665,7 @@ void VNestedLoopJoinNode::_release_mem() {
}

Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_is_output_left_side_only) {
RETURN_IF_ERROR(_build_output_block(&_left_block, block));
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ Status VScanNode::prepare(RuntimeState* state) {
}

Status VScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_CANCELLED(state);
return ExecNode::open(state);
}

Status VScanNode::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_alloc_resource_timer);
if (_opened) {
return Status::OK();
Expand Down Expand Up @@ -220,6 +222,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
}

Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_get_next_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
// in inverted index apply logic, in order to optimize query performance,
Expand Down
7 changes: 5 additions & 2 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
_hash_table_input_counter(nullptr),
_build_timer(nullptr),
_expr_timer(nullptr),
_exec_timer(nullptr),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
Expand Down Expand Up @@ -257,7 +256,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
_build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1);
_build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
Expand Down Expand Up @@ -417,11 +415,13 @@ Status AggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());

RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(prepare_profile(state));
return Status::OK();
}

Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));

RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
Expand Down Expand Up @@ -472,6 +472,7 @@ Status AggregationNode::open(RuntimeState* state) {

Status AggregationNode::do_pre_agg(vectorized::Block* input_block,
vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.pre_agg(input_block, output_block));

// pre stream agg need use _num_row_return to decide whether to do pre stream agg
Expand Down Expand Up @@ -510,6 +511,7 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
}

Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
_make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
Expand All @@ -520,6 +522,7 @@ Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* bloc
}

Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) {
SCOPED_TIMER(_exec_timer);
if (in_block->rows() > 0) {
RETURN_IF_ERROR(_executor.execute(in_block));
RETURN_IF_ERROR(_try_spill_disk());
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ class AggregationNode : public ::doris::ExecNode {
RuntimeProfile::Counter* _hash_table_input_counter;
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _expr_timer;
RuntimeProfile::Counter* _exec_timer;

private:
friend class pipeline::AggSinkOperator;
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/vanalytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) {

Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block,
bool* eos) {
SCOPED_TIMER(_exec_timer);
if (_input_eos && (_output_block_index == _input_blocks.size() || _input_total_rows == 0)) {
*eos = true;
return Status::OK();
Expand All @@ -290,6 +291,7 @@ Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block
}

void VAnalyticEvalNode::release_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
if (is_closed()) {
return;
}
Expand Down Expand Up @@ -335,6 +337,8 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block
break;
}
}

SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_output_current_block(block));
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block, block->columns()));
reached_limit(block, eos);
Expand Down Expand Up @@ -540,6 +544,7 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {

Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block* input_block,
bool eos) {
SCOPED_TIMER(_exec_timer);
_input_eos = eos;
if (_input_eos && input_block->rows() == 0) {
_need_more_input = false;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status VExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
Expand All @@ -76,6 +77,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
}

Status VExchangeNode::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
Expand All @@ -96,6 +98,7 @@ Status VExchangeNode::open(RuntimeState* state) {
}

Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(runtime_profile()->total_time_counter());
if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/vpartition_sort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
_emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");

RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor));
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc()));
_init_hash_method();
Expand Down Expand Up @@ -144,6 +145,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum
}

Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_TIMER(_exec_timer);
auto current_rows = input_block->rows();
if (current_rows > 0) {
child_input_rows = child_input_rows + current_rows;
Expand Down Expand Up @@ -229,6 +231,7 @@ Status VPartitionSortNode::open(RuntimeState* state) {
}

Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
Expand All @@ -245,6 +248,7 @@ bool VPartitionSortNode::can_read() {

Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
output_block->clear_column_data();
{
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/vrepeat_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Status VRepeatNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());

RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
Expand All @@ -91,6 +92,7 @@ Status VRepeatNode::open(RuntimeState* state) {
}

Status VRepeatNode::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state));
Expand Down Expand Up @@ -172,6 +174,7 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
}

Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
Expand Down Expand Up @@ -200,6 +203,7 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b
}

Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_TIMER(_exec_timer);
_child_eos = eos;
DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
DCHECK(!_expr_ctxs.empty());
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/vselect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool
}

Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
reached_limit(output_block, eos);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/vset_operation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Status VSetOperationNode<is_intersect>::init(const TPlanNode& tnode, RuntimeStat

template <bool is_intersect>
Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
// open result expr lists.
for (const VExprContextSPtrs& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
Expand Down Expand Up @@ -155,6 +156,7 @@ template <bool is_intersect>
Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
_pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
Expand Down Expand Up @@ -223,6 +225,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {

template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_exec_timer);
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;

if (block->rows() != 0) {
Expand Down Expand Up @@ -259,6 +262,7 @@ Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* block,

template <bool is_intersect>
Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block* output_block, bool* eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_pull_timer);
create_mutable_cols(output_block);
auto st = std::visit(
Expand Down Expand Up @@ -352,6 +356,7 @@ void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* state, int child_id, Block* block,
bool eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
CHECK(_build_finished) << "cannot sink probe data before build finished";
if (child_id > 1) {
Expand Down
Loading

0 comments on commit b5ee4a9

Please sign in to comment.