Skip to content

Commit

Permalink
use shared ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 19, 2024
1 parent 56171ee commit 42532ad
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 66 deletions.
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,14 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineTask>(pipeline, cur_task_id,
auto task = std::make_shared<PipelineTask>(pipeline, cur_task_id,
task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
_tasks[i].emplace_back(task);
}
}

Expand Down Expand Up @@ -1679,7 +1679,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
st = scheduler->schedule_task(t);
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down
35 changes: 33 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ Status PipelineTask::execute(bool* eos) {
}
}

RETURN_IF_ERROR(get_task_queue()->push_back(this, query_context()->is_cancelled()));
RETURN_IF_ERROR(
get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled()));
return Status::OK();
}

Expand Down Expand Up @@ -554,10 +555,40 @@ std::string PipelineTask::debug_string() {

void PipelineTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this, query_context()->is_cancelled()));
static_cast<void>(
get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled()));
}

void PipelineTask::clear_blocking_state(bool cancel) {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
if (cancel) {
DCHECK(query_context()->is_cancelled());
wake_up();
}

_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}

QueryContext* PipelineTask::query_context() {
return _fragment_context->get_query_ctx();
}

} // namespace doris::pipeline
38 changes: 9 additions & 29 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

class PipelineTask {
class PipelineTask final : public std::enable_shared_from_this<PipelineTask> {
public:
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
Expand Down Expand Up @@ -137,28 +137,7 @@ class PipelineTask {

void set_wake_up_early() { _wake_up_early = true; }

void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}
void clear_blocking_state(bool cancel = false);

void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
Expand Down Expand Up @@ -187,9 +166,10 @@ class PipelineTask {
*/
static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes);

bool put_in_runnable_queue() {
bool put_in_runnable_queue(bool eager) {
bool expected = false;
if (!_in_queue.compare_exchange_strong(expected, true)) {
if (!_in_eager_queue.compare_exchange_strong(expected, eager)) {
DCHECK(query_context()->is_cancelled()) << debug_string();
return false;
}
_schedule_time++;
Expand All @@ -200,10 +180,10 @@ class PipelineTask {
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }

bool is_running() { return _running.load(); }
void set_running(bool running) {
void set_running(bool running, bool eager = false) {
_running = running;
if (!running) {
_in_queue = false;
if (running && eager) {
_in_eager_queue = false;
}
}

Expand Down Expand Up @@ -328,7 +308,7 @@ class PipelineTask {
std::mutex _dependency_lock;

std::atomic<bool> _running = false;
std::atomic<bool> _in_queue = false;
std::atomic<bool> _in_eager_queue = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
};
Expand Down
26 changes: 14 additions & 12 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

PipelineTask* SubTaskQueue::try_take(bool is_steal) {
std::shared_ptr<PipelineTask> SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
Expand All @@ -54,7 +54,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
std::shared_ptr<PipelineTask> PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand Down Expand Up @@ -90,13 +90,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}

PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
std::shared_ptr<PipelineTask> PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}

PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
std::shared_ptr<PipelineTask> PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
Expand All @@ -111,7 +111,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}

Status PriorityTaskQueue::push(PipelineTask* task) {
Status PriorityTaskQueue::push(std::shared_ptr<PipelineTask>& task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
Expand Down Expand Up @@ -149,8 +149,8 @@ void MultiCoreTaskQueue::close() {
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
std::shared_ptr<PipelineTask> MultiCoreTaskQueue::take(int core_id) {
std::shared_ptr<PipelineTask> task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
Expand Down Expand Up @@ -178,7 +178,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
std::shared_ptr<PipelineTask> MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -196,20 +196,22 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task, bool eager) {
Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask> task, bool eager) {
int core_id = task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, eager ? _eager_queue_idx : core_id);
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask>& task, int core_id) {
DCHECK(core_id < _core_size || task->query_context()->is_cancelled())
<< " core_id: " << core_id << " _core_size: " << _core_size
<< " task: " << task->debug_string();
if (!task->put_in_runnable_queue()) {
DCHECK(task->query_context()->is_cancelled()) << task->debug_string();
auto eager = core_id == _eager_queue_idx;
if (!task->put_in_runnable_queue(eager)) {
DCHECK(task->query_context()->is_cancelled() && !eager)
<< " eager: " << eager << " task: " << task->debug_string();
return Status::OK();
}
return _prio_task_queues[core_id].push(task);
Expand Down
23 changes: 12 additions & 11 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class SubTaskQueue {
friend class PriorityTaskQueue;

public:
void push_back(PipelineTask* task) { _queue.emplace(task); }
void push_back(std::shared_ptr<PipelineTask>& task) { _queue.emplace(task); }

PipelineTask* try_take(bool is_steal);
std::shared_ptr<PipelineTask> try_take(bool is_steal);

void set_level_factor(double level_factor) { _level_factor = level_factor; }

Expand All @@ -58,7 +58,7 @@ class SubTaskQueue {
bool empty() { return _queue.empty(); }

private:
std::queue<PipelineTask*> _queue;
std::queue<std::shared_ptr<PipelineTask>> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;

Expand All @@ -72,18 +72,18 @@ class PriorityTaskQueue {

void close();

PipelineTask* try_take(bool is_steal);
std::shared_ptr<PipelineTask> try_take(bool is_steal);

PipelineTask* take(uint32_t timeout_ms = 0);
std::shared_ptr<PipelineTask> take(uint32_t timeout_ms = 0);

Status push(PipelineTask* task);
Status push(std::shared_ptr<PipelineTask>& task);

void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}

private:
PipelineTask* _try_take_unprotected(bool is_steal);
std::shared_ptr<PipelineTask> _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
Expand Down Expand Up @@ -112,19 +112,20 @@ class MultiCoreTaskQueue {
void close();

// Get the task by core id.
PipelineTask* take(int core_id);
std::shared_ptr<PipelineTask> take(int core_id);

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task, bool eager = false);
Status push_back(std::shared_ptr<PipelineTask> task, bool eager = false);

Status push_back(PipelineTask* task, int core_id);
Status push_back(std::shared_ptr<PipelineTask>& task, int core_id);

void update_statistics(PipelineTask* task, int64_t time_spent);

int num_queues() const { return cast_set<int>(_prio_task_queues.size()); }
int eager_queue_idx() const { return _eager_queue_idx; }

private:
PipelineTask* _steal_take(int core_id);
std::shared_ptr<PipelineTask> _steal_take(int core_id);

std::vector<PriorityTaskQueue> _prio_task_queues;
std::atomic<uint32_t> _next_core = 0;
Expand Down
16 changes: 9 additions & 7 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ Status TaskScheduler::start() {
return Status::OK();
}

Status TaskScheduler::schedule_task(PipelineTask* task) {
Status TaskScheduler::schedule_task(std::shared_ptr<PipelineTask>& task) {
return _task_queue.push_back(task);
}

// after _close_task, task maybe destructed.
void _close_task(PipelineTask* task, Status exec_status) {
void _close_task(std::shared_ptr<PipelineTask> task, Status exec_status) {
// Has to attach memory tracker here, because the close task will also release some memory.
// Should count the memory to the query or the query's memory will not decrease when part of
// task finished.
Expand All @@ -95,20 +95,22 @@ void _close_task(PipelineTask* task, Status exec_status) {
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline(task->pipeline_id());
task.reset();
}

void TaskScheduler::_do_work(int index) {
while (_markers[index]) {
auto* task = _task_queue.take(index);
auto task = _task_queue.take(index);
if (!task) {
continue;
}
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
}
auto eager = index == _task_queue.eager_queue_idx();
task->log_detail_if_need();
task->set_running(true);
task->set_running(true, eager);
task->set_task_queue(&_task_queue);
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();
Expand All @@ -122,7 +124,7 @@ void TaskScheduler::_do_work(int index) {
// If pipeline is canceled, it will report after pipeline closed, and will propagate
// errors to downstream through exchange. So, here we needn't send_report.
// fragment_ctx->send_report(true);
_close_task(task, fragment_ctx->get_query_ctx()->exec_status());
_close_task(std::move(task), fragment_ctx->get_query_ctx()->exec_status());
continue;
}

Expand Down Expand Up @@ -165,7 +167,7 @@ void TaskScheduler::_do_work(int index) {
LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
print_id(task->query_context()->query_id()),
status.to_string());
_close_task(task, status);
_close_task(std::move(task), status);
continue;
}
fragment_ctx->trigger_report_if_necessary();
Expand All @@ -178,7 +180,7 @@ void TaskScheduler::_do_work(int index) {
task->set_running(false);
} else {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
_close_task(std::move(task), exec_status);
}
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskScheduler {

~TaskScheduler();

Status schedule_task(PipelineTask* task);
Status schedule_task(std::shared_ptr<PipelineTask>& task);

Status start();

Expand Down

0 comments on commit 42532ad

Please sign in to comment.