Skip to content

Commit

Permalink
[pipelineX](dependency) Wake by task by read dependency (apache#27260)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Nov 20, 2023
1 parent 0b459e5 commit 840f3b6
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 74 deletions.
1 change: 0 additions & 1 deletion be/src/http/action/pipeline_task_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace doris {

class HttpRequest;

// Get BE health state from http API.
class PipelineTaskAction : public HttpHandler {
public:
PipelineTaskAction() = default;
Expand Down
16 changes: 0 additions & 16 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class ExchangeSinkQueueDependency final : public WriteDependency {
ExchangeSinkQueueDependency(int id, int node_id)
: WriteDependency(id, node_id, "ResultQueueDependency") {}
~ExchangeSinkQueueDependency() override = default;

void* shared_state() override { return nullptr; }
};

class BroadcastDependency final : public WriteDependency {
Expand All @@ -95,19 +93,6 @@ class BroadcastDependency final : public WriteDependency {
}
}

void* shared_state() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
return nullptr;
}

void set_ready_for_write() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
}

void block_writing() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
}

int available_blocks() const { return _available_block; }

private:
Expand Down Expand Up @@ -138,7 +123,6 @@ class LocalExchangeChannelDependency final : public WriteDependency {
LocalExchangeChannelDependency(int id, int node_id)
: WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
~LocalExchangeChannelDependency() override = default;
void* shared_state() override { return nullptr; }
// TODO(gabriel): blocked by memory
};

Expand Down
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ struct ExchangeDataDependency final : public Dependency {
ExchangeDataDependency(int id, int node_id,
vectorized::VDataStreamRecvr::SenderQueue* sender_queue)
: Dependency(id, node_id, "DataDependency"), _always_done(false) {}
void* shared_state() override { return nullptr; }

void set_always_done() {
_always_done = true;
if (_ready_for_read) {
if (_always_done) {
return;
}
_read_dependency_watcher.stop();
_ready_for_read = true;
_always_done = true;
Dependency::set_ready_for_read();
}

void block_reading() override {
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class SharedHashTableDependency final : public WriteDependency {
SharedHashTableDependency(int id, int node_id)
: WriteDependency(id, node_id, "SharedHashTableDependency") {}
~SharedHashTableDependency() override = default;

void* shared_state() override { return nullptr; }
};

class HashJoinBuildSinkLocalState final
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class ResultSinkDependency final : public WriteDependency {
ResultSinkDependency(int id, int node_id)
: WriteDependency(id, node_id, "ResultSinkDependency") {}
~ResultSinkDependency() override = default;

void* shared_state() override { return nullptr; }
};

class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class ScanDependency final : public Dependency {
ScanDependency(int id, int node_id)
: Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) {}

void* shared_state() override { return nullptr; }

// TODO(gabriel):
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
Expand All @@ -73,6 +71,8 @@ class ScanDependency final : public Dependency {
return Dependency::read_blocked_by(task);
}

bool push_to_blocking_queue() override { return true; }

void block_reading() override {
if (_eos) {
return;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState
_build_timer = ADD_TIMER(_profile, "BuildTime");

Parent& parent = _parent->cast<Parent>();
_dependency->set_cur_child_id(parent._cur_child_id);
_child_exprs.resize(parent._child_exprs.size());
for (size_t i = 0; i < _child_exprs.size(); i++) {
RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class PipelineTask {
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
PipelineTaskState get_state() { return _cur_state; }
PipelineTaskState get_state() const { return _cur_state; }
void set_state(PipelineTaskState state);

virtual bool is_pending_finish() {
Expand All @@ -154,6 +154,7 @@ class PipelineTask {
}

virtual bool source_can_read() { return _source->can_read() || _pipeline->_always_can_read; }
virtual bool push_blocked_task_to_queue() const { return true; }

virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
Expand Down
23 changes: 16 additions & 7 deletions be/src/pipeline/pipeline_x/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
namespace doris::pipeline {

void Dependency::add_block_task(PipelineXTask* task) {
// TODO(gabriel): support read dependency
if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == task) {
return;
}
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task)
<< "Duplicate task: " << task->debug_string();
_blocked_task.push_back(task);
}

Expand Down Expand Up @@ -74,6 +72,17 @@ void Dependency::set_ready_for_read() {
_ready_for_read = true;
local_block_task.swap(_blocked_task);
}
for (auto* task : local_block_task) {
task->try_wake_up(this);
}
}

void SetDependency::set_ready_for_read() {
if (_child_idx == 0) {
WriteDependency::set_ready_for_read();
} else {
_set_state->probe_finished_children_dependency[0]->set_ready_for_read();
}
}

void WriteDependency::set_ready_for_write() {
Expand All @@ -84,7 +93,7 @@ void WriteDependency::set_ready_for_write() {

std::vector<PipelineXTask*> local_block_task {};
{
std::unique_lock<std::mutex> lc(_task_lock);
std::unique_lock<std::mutex> lc(_write_task_lock);
if (_ready_for_write) {
return;
}
Expand Down Expand Up @@ -133,7 +142,7 @@ Dependency* Dependency::read_blocked_by(PipelineXTask* task) {

std::unique_lock<std::mutex> lc(_task_lock);
auto ready_for_read = _ready_for_read.load();
if (!ready_for_read && task) {
if (!ready_for_read && !push_to_blocking_queue() && task) {
add_block_task(task);
}
return ready_for_read ? nullptr : this;
Expand Down Expand Up @@ -162,7 +171,7 @@ FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) {
}

WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
std::unique_lock<std::mutex> lc(_write_task_lock);
const auto ready_for_write = _ready_for_write.load();
if (!ready_for_write && task) {
add_write_block_task(task);
Expand Down
36 changes: 7 additions & 29 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ class Dependency : public std::enable_shared_from_this<Dependency> {

[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
virtual void* shared_state() = 0;
virtual void* shared_state() { return nullptr; }
virtual std::string debug_string(int indentation_level = 0);
virtual bool is_write_dependency() { return false; }
virtual bool push_to_blocking_queue() { return false; }

// Start the watcher. We use it to count how long this dependency block the current pipeline task.
void start_read_watcher() {
Expand Down Expand Up @@ -145,6 +146,7 @@ class WriteDependency : public Dependency {
protected:
friend class Dependency;
std::atomic<bool> _ready_for_write {true};
std::mutex _write_task_lock;
MonotonicStopWatch _write_dependency_watcher;

private:
Expand Down Expand Up @@ -172,7 +174,6 @@ class FinishDependency final : public Dependency {

void set_ready_to_finish();

void* shared_state() override { return nullptr; }
std::string debug_string(int indentation_level = 0) override;

void add_block_task(PipelineXTask* task) override;
Expand Down Expand Up @@ -222,7 +223,6 @@ class RuntimeFilterDependency final : public Dependency {
RuntimeFilterDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name) {}
RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
void* shared_state() override { return nullptr; }
void add_filters(IRuntimeFilter* runtime_filter);
void sub_filters();
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
Expand Down Expand Up @@ -255,8 +255,6 @@ class AndDependency final : public WriteDependency {
return fmt::to_string(debug_string_buffer);
}

void* shared_state() override { return nullptr; }

std::string debug_string(int indentation_level = 0) override;

[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
Expand Down Expand Up @@ -284,7 +282,6 @@ struct FakeDependency final : public WriteDependency {
public:
FakeDependency(int id, int node_id) : WriteDependency(id, node_id, "FakeDependency") {}
using SharedState = FakeSharedState;
void* shared_state() override { return nullptr; }
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { return nullptr; }
[[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override {
return nullptr;
Expand Down Expand Up @@ -465,6 +462,7 @@ class UnionDependency final : public WriteDependency {
}
return this;
}
bool push_to_blocking_queue() override { return true; }
void block_reading() override {}
void block_writing() override {}

Expand Down Expand Up @@ -667,7 +665,6 @@ class AsyncWriterDependency final : public WriteDependency {
AsyncWriterDependency(int id, int node_id)
: WriteDependency(id, node_id, "AsyncWriterDependency") {}
~AsyncWriterDependency() override = default;
void* shared_state() override { return nullptr; }
};

class SetDependency;
Expand Down Expand Up @@ -769,30 +766,10 @@ class SetDependency final : public WriteDependency {

void set_shared_state(std::shared_ptr<SetSharedState> set_state) { _set_state = set_state; }

// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
_should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " "
<< id() << " " << _node_id << " block tasks: " << _blocked_task.size();
}
std::unique_lock<std::mutex> lc(_task_lock);
if (!_set_state->ready_for_read && task) {
add_block_task(task);
}
return _set_state->ready_for_read ? nullptr : this;
}

// Notify downstream pipeline tasks this dependency is ready.
void set_ready_for_read() override {
if (_set_state->ready_for_read) {
return;
}
_read_dependency_watcher.stop();
_set_state->ready_for_read = true;
}
void set_ready_for_read() override;

void set_cur_child_id(int id) {
_child_idx = id;
_set_state->probe_finished_children_dependency[id] = this;
if (id != 0) {
block_writing();
Expand All @@ -801,6 +778,7 @@ class SetDependency final : public WriteDependency {

private:
std::shared_ptr<SetSharedState> _set_state;
int _child_idx {0};
};

using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Status PipelineXTask::_open() {
_blocked_dep = _filter_dependency->filter_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
set_use_blocking_queue(false);
set_use_blocking_queue();
RETURN_IF_ERROR(st);
} else if (i == 1) {
CHECK(false) << debug_string();
Expand Down
17 changes: 10 additions & 7 deletions be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,27 @@ class PipelineXTask : public PipelineTask {

OperatorXs operatorXs() { return _operators; }

bool push_blocked_task_to_queue() {
bool push_blocked_task_to_queue() const override {
/**
* Push task into blocking queue if:
* 1. `_use_blocking_queue` is true.
* 2. Or this task is blocked by FE two phase execution (BLOCKED_FOR_DEPENDENCY).
*/
return _use_blocking_queue || get_state() == PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
}
void set_use_blocking_queue(bool use_blocking_queue) {
_use_blocking_queue = use_blocking_queue;
void set_use_blocking_queue() {
if (_blocked_dep->push_to_blocking_queue()) {
_use_blocking_queue = true;
return;
}
_use_blocking_queue = false;
}

private:
Dependency* _write_blocked_dependency() {
_blocked_dep = _write_dependencies->write_blocked_by(this);
if (_blocked_dep != nullptr) {
set_use_blocking_queue(false);
set_use_blocking_queue();
static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
return _blocked_dep;
}
Expand All @@ -154,7 +158,7 @@ class PipelineXTask : public PipelineTask {
for (auto* fin_dep : _finish_dependencies) {
_blocked_dep = fin_dep->finish_blocked_by(this);
if (_blocked_dep != nullptr) {
set_use_blocking_queue(false);
set_use_blocking_queue();
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
return _blocked_dep;
}
Expand All @@ -166,8 +170,7 @@ class PipelineXTask : public PipelineTask {
for (auto* op_dep : _read_dependencies) {
_blocked_dep = op_dep->read_blocked_by(this);
if (_blocked_dep != nullptr) {
// TODO(gabriel):
set_use_blocking_queue(true);
set_use_blocking_queue();
_blocked_dep->start_read_watcher();
return _blocked_dep;
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) {
return Status::InternalError("BlockedTaskScheduler shutdown");
}
std::unique_lock<std::mutex> lock(_task_mutex);
if (task->is_pipelineX() && !static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
if (!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
// put this task into current dependency's blocking queue and wait for event notification
// instead of using a separate BlockedTaskScheduler.
return Status::OK();
Expand Down Expand Up @@ -142,6 +142,11 @@ void BlockedTaskScheduler::_schedule() {
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
_make_task_run(local_blocked_tasks, iter);
} else if (!task->push_blocked_task_to_queue()) {
// TODO(gabriel): This condition means this task is in blocking queue now and we should
// remove it because this new dependency should not be put into blocking queue. We
// will delete this strange behavior after ScanDependency and UnionDependency done.
local_blocked_tasks.erase(iter++);
} else {
iter++;
}
Expand Down

0 comments on commit 840f3b6

Please sign in to comment.