Skip to content

Commit

Permalink
[opt](nereids) hbo fix pb
Browse files Browse the repository at this point in the history
  • Loading branch information
xzj7019 committed Dec 10, 2024
1 parent ea0546c commit 3cfc94f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
12 changes: 5 additions & 7 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,11 @@ Status OperatorXBase::open(RuntimeState* state) {
return Status::OK();
}

void OperatorXBase::update_exec_stats(RuntimeState* state) {
void PipelineXLocalStateBase::update_exec_stats(RuntimeState* state) {
QueryContext* ctx = state->get_query_ctx();

if (ctx != nullptr && ctx->need_record_exec_stats(_node_id)) {
// todo: what's the input/output rows?
ctx->update_push_rows_stats(_node_id, get_query_statistics_ptr()->get_returned_rows()); // push means output rows
ctx->update_pull_rows_stats(_node_id, get_query_statistics_ptr()->get_scan_rows()); // pull means input rows
if (ctx != nullptr && ctx->need_record_exec_stats(_parent->node_id())) {
ctx->update_push_rows_stats(_parent->node_id(), get_query_statistics_ptr()->get_returned_rows()); // push means output rows
ctx->update_pull_rows_stats(_parent->node_id(), get_query_statistics_ptr()->get_scan_rows()); // pull means input rows
//ctx->update_pred_filter_stats(_node_id, state->num_rows_load_unselected());
// todo: update rows after runtime filter
//if (_bloom_filter_eval_context.join_runtime_filter_input_counter != nullptr &&
Expand All @@ -253,7 +251,6 @@ void OperatorXBase::update_exec_stats(RuntimeState* state) {
}

Status OperatorXBase::close(RuntimeState* state) {
this->update_exec_stats(state);
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->close(state));
}
Expand Down Expand Up @@ -535,6 +532,7 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
update_exec_stats(state);
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class PipelineXLocalStateBase {
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state) = 0;
virtual void update_exec_stats(RuntimeState* state);

// If use projection, we should clear `_origin_block`.
void clear_origin_block();
Expand Down Expand Up @@ -729,7 +730,6 @@ class OperatorXBase : public OperatorBase {
vectorized::Block* output_block) const;
void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; }
int parallel_tasks() const { return _parallel_tasks; }
virtual void update_exec_stats(RuntimeState* state);

// To keep compatibility with older FE
void set_serial_operator() { _is_serial_operator = true; }
Expand Down

0 comments on commit 3cfc94f

Please sign in to comment.