Skip to content

Commit

Permalink
[refactor](metrics) Complete metrics for operators (apache#43002)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Nov 1, 2024
1 parent 1a1cfea commit e9e3327
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 32 deletions.
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_fetch_row_id_timer = ADD_TIMER(profile(), "FetchRowIdTime");
_write_data_timer = ADD_TIMER(profile(), "WriteDataTime");
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
auto fragment_instance_id = state->fragment_instance_id();

_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);

if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
Expand Down Expand Up @@ -146,11 +146,14 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.blocks_sent_counter(), 1);
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
SCOPED_TIMER(local_state._fetch_row_id_timer);
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
RETURN_IF_ERROR(local_state._writer->write(state, *block));
{
SCOPED_TIMER(local_state._write_data_timer);
RETURN_IF_ERROR(local_state._writer->write(state, *block));
}
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling _second_phase_fetch_data().
// So we should clear block in case of unmatched columns
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedSta
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }

private:
friend class ResultSinkOperatorX;
Expand All @@ -137,7 +136,9 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedSta

std::shared_ptr<BufferControlBlock> _sender = nullptr;
std::shared_ptr<ResultWriter> _writer = nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;

RuntimeProfile::Counter* _fetch_row_id_timer = nullptr;
RuntimeProfile::Counter* _write_data_timer = nullptr;
};

class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> {
Expand Down
11 changes: 9 additions & 2 deletions be/src/pipeline/exec/set_probe_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized

auto probe_rows = in_block->rows();
if (probe_rows > 0) {
RETURN_IF_ERROR(_extract_probe_column(local_state, *in_block, local_state._probe_columns,
_cur_child_id));
{
SCOPED_TIMER(local_state._extract_probe_data_timer);
RETURN_IF_ERROR(_extract_probe_column(local_state, *in_block,
local_state._probe_columns, _cur_child_id));
}
RETURN_IF_ERROR(std::visit(
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
SCOPED_TIMER(local_state._probe_timer);
vectorized::HashTableProbe<HashTableCtxType, is_intersect>
process_hashtable_ctx(&local_state, probe_rows);
return process_hashtable_ctx.mark_data_in_hashtable(arg);
Expand All @@ -99,6 +103,9 @@ Status SetProbeSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSink
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);

_probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
_extract_probe_data_timer = ADD_TIMER(Base::profile(), "ExtractProbeDataTime");
Parent& parent = _parent->cast<Parent>();
_shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency;
_dependency->block();
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class SetProbeSinkLocalState final : public PipelineXSinkLocalState<SetSharedSta
vectorized::ColumnRawPtrs _probe_columns;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;

RuntimeProfile::Counter* _extract_probe_data_timer = nullptr;
RuntimeProfile::Counter* _probe_timer = nullptr;
};

template <bool is_intersect>
Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl;

if (in_block->rows() != 0) {
RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));

{
SCOPED_TIMER(local_state._merge_block_timer);
RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));
}
if (local_state._mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported("set operator do not support build table rows over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
}
}

if (eos || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
SCOPED_TIMER(local_state._build_timer);
build_block = local_state._mutable_block.to_block();
RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
Expand Down Expand Up @@ -152,6 +155,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState
RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_merge_block_timer = ADD_TIMER(_profile, "MergeBlocksTime");
_build_timer = ADD_TIMER(_profile, "BuildTime");
auto& parent = _parent->cast<Parent>();
_shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency;
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {

private:
friend class SetSinkOperatorX<is_intersect>;
template <class HashTableContext, bool is_intersected>
friend struct vectorized::HashTableBuild;

RuntimeProfile::Counter* _build_timer; // time to build hash table
vectorized::MutableBlock _mutable_block;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
vectorized::Arena _arena;

RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
};

template <bool is_intersect>
Expand Down
37 changes: 22 additions & 15 deletions be/src/pipeline/exec/set_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateIn
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime");
_filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
_shared_state->probe_finished_children_dependency.resize(
_parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, nullptr);
return Status::OK();
Expand Down Expand Up @@ -75,21 +77,26 @@ Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
_create_mutable_cols(local_state, block);
auto st = std::visit(
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
return _get_data_in_hashtable<HashTableCtxType>(local_state, arg, block,
state->batch_size(), eos);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
local_state._shared_state->hash_table_variants->method_variant);
RETURN_IF_ERROR(st);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
{
SCOPED_TIMER(local_state._get_data_timer);
RETURN_IF_ERROR(std::visit(
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
return _get_data_in_hashtable<HashTableCtxType>(local_state, arg, block,
state->batch_size(), eos);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
local_state._shared_state->hash_table_variants->method_variant));
}
{
SCOPED_TIMER(local_state._filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
}
local_state.reached_limit(block, eos);
return Status::OK();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/set_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class SetSourceLocalState final : public PipelineXLocalState<SetSharedState> {
std::vector<vectorized::MutableColumnPtr> _mutable_cols;
//record build column type
vectorized::DataTypes _left_table_data_types;

RuntimeProfile::Counter* _get_data_timer = nullptr;
RuntimeProfile::Counter* _filter_timer = nullptr;
};

template <bool is_intersect>
Expand Down
8 changes: 7 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
SCOPED_TIMER(_init_timer);
_sort_blocks_memory_usage =
ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1);
_append_blocks_timer = ADD_TIMER(profile(), "AppendBlockTime");
_update_runtime_predicate_timer = ADD_TIMER(profile(), "UpdateRuntimePredicateTime");
return Status::OK();
}

Expand Down Expand Up @@ -119,7 +121,10 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
{
SCOPED_TIMER(local_state._append_blocks_timer);
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
}
int64_t data_size = local_state._shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
Expand All @@ -128,6 +133,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
RETURN_IF_CANCELLED(state);

if (state->get_query_ctx()->has_runtime_predicate(_node_id)) {
SCOPED_TIMER(local_state._update_runtime_predicate_timer);
auto& predicate = state->get_query_ctx()->get_runtime_predicate(_node_id);
if (predicate.enable()) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {

// topn top value
vectorized::Field old_top {vectorized::Field::Types::Null};
RuntimeProfile::Counter* _append_blocks_timer = nullptr;
RuntimeProfile::Counter* _update_runtime_predicate_timer = nullptr;
};

class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Expand Down
22 changes: 19 additions & 3 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ namespace doris::pipeline {
TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), _child_block(vectorized::Block::create_unique()) {}

Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_init_function_timer = ADD_TIMER(_runtime_profile, "InitTableFunctionTime");
_process_rows_timer = ADD_TIMER(_runtime_profile, "ProcessRowsTime");
_copy_data_timer = ADD_TIMER(_runtime_profile, "CopyDataTime");
_filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
_repeat_data_timer = ADD_TIMER(_runtime_profile, "RepeatDataTime");
return Status::OK();
}

Status TableFunctionLocalState::open(RuntimeState* state) {
SCOPED_TIMER(PipelineXLocalState<>::exec_time_counter());
SCOPED_TIMER(PipelineXLocalState<>::_open_timer);
Expand Down Expand Up @@ -59,6 +71,7 @@ void TableFunctionLocalState::_copy_output_slots(
if (!_current_row_insert_times) {
return;
}
SCOPED_TIMER(_copy_data_timer);
auto& p = _parent->cast<TableFunctionOperatorX>();
for (auto index : p._output_slot_indexs) {
auto src_column = _child_block->get_by_position(index).column;
Expand Down Expand Up @@ -197,15 +210,18 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
columns[index]->insert_many_defaults(row_size - columns[index]->size());
}

// 3. eval conjuncts
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
{
SCOPED_TIMER(_filter_timer); // 3. eval conjuncts
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
}

*eos = _child_eos && _cur_child_offset == -1;
return Status::OK();
}

void TableFunctionLocalState::process_next_child_row() {
SCOPED_TIMER(_process_rows_timer);
_cur_child_offset++;

if (_cur_child_offset >= _child_block->rows()) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/table_function_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {
TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent);
~TableFunctionLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& infos) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override {
for (auto* fn : _fns) {
Expand Down Expand Up @@ -67,6 +68,12 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {
std::unique_ptr<vectorized::Block> _child_block;
int _current_row_insert_times = 0;
bool _child_eos = false;

RuntimeProfile::Counter* _init_function_timer = nullptr;
RuntimeProfile::Counter* _process_rows_timer = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _repeat_data_timer = nullptr;
};

class TableFunctionOperatorX final : public StatefulOperatorX<TableFunctionLocalState> {
Expand All @@ -93,6 +100,7 @@ class TableFunctionOperatorX final : public StatefulOperatorX<TableFunctionLocal
}

for (auto* fn : local_state._fns) {
SCOPED_TIMER(local_state._init_function_timer);
RETURN_IF_ERROR(fn->process_init(input_block, state));
}
local_state.process_next_child_row();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_expr_timer = ADD_TIMER(_profile, "ExprTime");
auto& p = _parent->cast<Parent>();
_shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id);
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class UnionSinkLocalState final : public PipelineXSinkLocalState<UnionSharedStat

/// Index of current row in child_row_block_.
int _child_row_idx;
RuntimeProfile::Counter* _expr_timer = nullptr;
};

class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
Expand Down Expand Up @@ -136,6 +137,7 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx,
vectorized::Block* res_block) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state._expr_timer);
const auto& child_exprs = local_state._child_expr;
vectorized::ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
Expand Down

0 comments on commit e9e3327

Please sign in to comment.