Skip to content

Commit

Permalink
[fix](pipeline) Make all upstream tasks runnable if all tasks finishe… (
Browse files Browse the repository at this point in the history
apache#41292)

Consider 3 pipelines in this fragment (... -> join -> shuffle) :
pipeline 0 : `... -> local exchange sink`
pipeline 1 : `... -> join build (INNER JOIN)`
pipeline 2 : `local exchange source -> join probe (INNER JOIN) -> data
stream sender `

Assume the JoinBuild returned 0 rows, join probe can finish directly
once join build finished and do not need to wait for the `local exchange
sink` finished. In this case, if pipeline 0 is blocked by a dependency
for a long time, pipeline 2 should notify pipeline 0 to finish.
  • Loading branch information
Gabriel39 authored Oct 8, 2024
1 parent 0e5fd2f commit 4b8375f
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 23 deletions.
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
}

Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table) {
Expand All @@ -132,8 +135,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Status::OK();
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
Expand All @@ -151,7 +154,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Status::OK();
return Base::close(state, exec_status);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
Expand Down Expand Up @@ -535,6 +538,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ class PipelineXSinkLocalStateBase {
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed = false;
std::atomic<bool> _eos = false;
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};
Expand Down
13 changes: 8 additions & 5 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
}
RETURN_IF_ERROR(Base::close(state, exec_status));
if (exec_status.ok()) {
DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0)
<< " state: { cancel = " << state->is_cancelled() << ", "
<< state->cancel_reason().to_string() << "} query ctx: { cancel = "
<< state->get_query_ctx()->is_cancelled() << ", "
<< state->get_query_ctx()->exec_status().to_string() << "}";
DCHECK(_release_count || _exchanger == nullptr ||
_exchanger->_running_source_operators == 0)
<< "Do not finish correctly! " << debug_string(0)
<< " state: { cancel = " << state->is_cancelled() << ", "
<< state->cancel_reason().to_string()
<< "} query ctx: { cancel = " << state->get_query_ctx()->is_cancelled() << ", "
<< state->get_query_ctx()->exec_status().to_string()
<< "} Exchanger: " << (void*)_exchanger;
}
return Status::OK();
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>

#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -65,4 +66,12 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
return Status::OK();
}

} // namespace doris::pipeline
void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}

} // namespace doris::pipeline
17 changes: 16 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::weak_ptr<PipelineFragmentContext> context)
: _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}

// Add operators for pipelineX
Expand Down Expand Up @@ -104,14 +105,24 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
void incr_created_tasks(int i, PipelineTask* task) {
_num_tasks_created++;
_num_tasks_running++;
DCHECK_LT(i, _tasks.size());
_tasks[i] = task;
}

void make_all_runnable();

void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
_tasks.resize(_num_tasks, nullptr);
for (auto& op : _operators) {
op->set_parallel_tasks(_num_tasks);
}
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }

std::string debug_string() {
fmt::memory_buffer debug_string_buffer;
Expand Down Expand Up @@ -158,6 +169,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
int _num_tasks = 1;
// How many tasks are already created?
std::atomic<int> _num_tasks_created = 0;
// How many tasks are already created and not finished?
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
};

} // namespace doris::pipeline
16 changes: 14 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
runtime_state.reset();
}
}
_dag.clear();
_pip_id_to_pipeline.clear();
_pipelines.clear();
_sink.reset();
_root_op.reset();
Expand Down Expand Up @@ -365,6 +367,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
_pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
}
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());

Expand Down Expand Up @@ -469,6 +472,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
Expand Down Expand Up @@ -562,7 +566,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
}
}
_pipeline_parent_map.clear();
_dag.clear();
_op_id_to_le_state.clear();

return Status::OK();
Expand Down Expand Up @@ -1752,7 +1755,16 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

void PipelineFragmentContext::close_a_pipeline() {
void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable();
}
}
}
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void close_a_pipeline();
void close_a_pipeline(PipelineId pipeline_id);

Status send_report(bool);

Expand Down Expand Up @@ -291,6 +291,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
_op_id_to_le_state;

std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
// UniqueId -> runtime mgr
std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ PipelineTask::PipelineTask(
if (shared_state) {
_sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}

Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
Expand Down Expand Up @@ -279,7 +278,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos;
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
Expand Down
12 changes: 8 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class PipelineTask {
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }

void clear_blocking_state() {
void clear_blocking_state(bool wake_up_by_downstream = false) {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
Expand Down Expand Up @@ -231,6 +232,8 @@ class PipelineTask {
}
}

PipelineId pipeline_id() const { return _pipeline->id(); }

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down Expand Up @@ -306,11 +309,12 @@ class PipelineTask {

Dependency* _execution_dep = nullptr;

std::atomic<bool> _finalized {false};
std::atomic<bool> _finalized = false;
std::mutex _dependency_lock;

std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_by_downstream = false;
};

} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void _close_task(PipelineTask* task, Status exec_status) {
}
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline();
task->fragment_context()->close_a_pipeline(task->pipeline_id());
}

void TaskScheduler::_do_work(size_t index) {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
}
*local_merge_filters = &iter->second;
DCHECK(!iter->second.filters.empty());
DCHECK_GT(iter->second.merge_time, 0);
return Status::OK();
}

Expand Down

0 comments on commit 4b8375f

Please sign in to comment.