Skip to content

Commit

Permalink
[Chore](join) add debug info for build_sink::close (#44974)
Browse files Browse the repository at this point in the history
add debug info for build_sink::close
  • Loading branch information
BiteTheDDDDt authored Dec 4, 2024
1 parent 7836e93 commit 5b5cbf6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 11 deletions.
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, "
"build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, "
"has_remote_target: {},error_msg: [{}]",
"has_remote_target: {}, error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
_dependency ? _dependency->debug_string() : "none", _synced_size, _has_local_target,
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ std::string Dependency::debug_string(int indentation_level) {

std::string CountedFinishDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block_task={}, ready={}, _always_ready={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _counter);
fmt::format_to(
debug_string_buffer,
"{}{}: id={}, block_task={}, ready={}, _always_ready={}, count={}, _stack_set_ready={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), _ready,
_always_ready, _counter, _stack_set_ready);
return fmt::to_string(debug_string_buffer);
}

Expand Down
22 changes: 19 additions & 3 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "pipeline/common/set_utils.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/join/process_hash_table_probe.h"
#include "util/stack_util.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -107,7 +108,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
[[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = nullptr);
// Notify downstream pipeline tasks this dependency is ready.
void set_ready();
virtual void set_ready();
void set_ready_to_read() {
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
Expand Down Expand Up @@ -172,11 +173,26 @@ struct FakeSharedState final : public BasicSharedState {
ENABLE_FACTORY_CREATOR(FakeSharedState)
};

class CountedFinishDependency final : public Dependency {
class DependencyWithStack : public Dependency {
public:
using SharedState = FakeSharedState;
DependencyWithStack(int id, int node_id, std::string name, bool ready = false)
: Dependency(id, node_id, name, ready) {}

void set_ready() override {
_stack_set_ready = get_stack_trace();
Dependency::set_ready();
}

protected:
std::string _stack_set_ready;
};

class CountedFinishDependency final : public DependencyWithStack {
public:
using SharedState = FakeSharedState;
CountedFinishDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name, true) {}
: DependencyWithStack(id, node_id, name, true) {}

void add() {
std::unique_lock<std::mutex> l(_mtx);
Expand Down
13 changes: 10 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,22 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else if (p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"build_sink::close meet error state, shared_hash_table_signaled: {}, "
"complete_build_stage: {}",
p._shared_hash_table_context->signaled,
p._shared_hash_table_context->complete_build_stage);
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
return Status::InternalError(
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: {}",
e.to_string(), state->get_task()->wake_up_by_downstream(),
_should_build_hash_table);
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: "
"{}, _finish_dependency: {}",
e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table,
_finish_dependency->debug_string());
}
return Base::close(state, exec_status);
}
Expand Down

0 comments on commit 5b5cbf6

Please sign in to comment.