Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 20, 2024
1 parent 6e2fe8b commit 4a17d43
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 36 deletions.
6 changes: 2 additions & 4 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ Status PipelineTask::execute(bool* eos) {
}
}

RETURN_IF_ERROR(
get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled()));
RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
return Status::OK();
}

Expand Down Expand Up @@ -555,8 +554,7 @@ std::string PipelineTask::debug_string() {

void PipelineTask::wake_up() {
// call by dependency
static_cast<void>(
get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled()));
static_cast<void>(get_task_queue()->push_back(shared_from_this()));
}

void PipelineTask::clear_blocking_state(bool cancel) {
Expand Down
16 changes: 2 additions & 14 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,15 @@ class PipelineTask final : public std::enable_shared_from_this<PipelineTask> {
*/
static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes);

bool put_in_runnable_queue(bool eager) {
bool expected = false;
if (!_in_eager_queue.compare_exchange_strong(expected, eager)) {
DCHECK(query_context()->is_cancelled()) << debug_string();
return false;
}
void put_in_runnable_queue() {
_schedule_time++;
_wait_worker_watcher.start();
return true;
}

void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }

bool is_running() { return _running.load(); }
void set_running(bool running, bool eager = false) {
_running = running;
if (running && eager) {
_in_eager_queue = false;
}
}
void set_running(bool running) { _running = running; }

bool is_exceed_debug_timeout() {
if (_has_exceed_timeout) {
Expand Down Expand Up @@ -308,7 +297,6 @@ class PipelineTask final : public std::enable_shared_from_this<PipelineTask> {
std::mutex _dependency_lock;

std::atomic<bool> _running = false;
std::atomic<bool> _in_eager_queue = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
};
Expand Down
21 changes: 8 additions & 13 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
: _prio_task_queues(core_size + NUM_EAGER_QUEUES),
_closed(false),
_core_size(core_size),
_eager_queue_idx(core_size) {}
_urgent_queue_idx(core_size) {}

void MultiCoreTaskQueue::close() {
if (_closed) {
Expand All @@ -160,7 +160,7 @@ std::shared_ptr<PipelineTask> MultiCoreTaskQueue::take(int core_id) {
task->set_core_id(core_id);
break;
}
if (core_id != _eager_queue_idx) {
if (core_id != _urgent_queue_idx) {
task = _steal_take(core_id);
if (task) {
break;
Expand Down Expand Up @@ -196,24 +196,19 @@ std::shared_ptr<PipelineTask> MultiCoreTaskQueue::_steal_take(int core_id) {
return nullptr;
}

Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask> task, bool eager) {
Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask> task) {
int core_id = task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, eager ? _eager_queue_idx : core_id);
return push_back(task, core_id);
}

Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask>& task, int core_id) {
DCHECK(core_id < _core_size || task->query_context()->is_cancelled())
<< " core_id: " << core_id << " _core_size: " << _core_size
<< " task: " << task->debug_string();
auto eager = core_id == _eager_queue_idx;
if (!task->put_in_runnable_queue(eager)) {
DCHECK(task->query_context()->is_cancelled()) << " task: " << task->debug_string();
return Status::OK();
}
return _prio_task_queues[core_id].push(task);
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return _prio_task_queues[task->query_context()->is_cancelled() ? _urgent_queue_idx : core_id]
.push(task);
}

void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,13 @@ class MultiCoreTaskQueue {
std::shared_ptr<PipelineTask> take(int core_id);

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(std::shared_ptr<PipelineTask> task, bool eager = false);
Status push_back(std::shared_ptr<PipelineTask> task);

Status push_back(std::shared_ptr<PipelineTask>& task, int core_id);

void update_statistics(PipelineTask* task, int64_t time_spent);

int num_queues() const { return cast_set<int>(_prio_task_queues.size()); }
int eager_queue_idx() const { return _eager_queue_idx; }

private:
std::shared_ptr<PipelineTask> _steal_take(int core_id);
Expand All @@ -132,7 +131,7 @@ class MultiCoreTaskQueue {
std::atomic<bool> _closed;

const int _core_size;
const int _eager_queue_idx;
const int _urgent_queue_idx;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
#include "common/compile_check_end.h"
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ void TaskScheduler::_do_work(int index) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
}
auto eager = index == _task_queue.eager_queue_idx();
task->log_detail_if_need();
task->set_running(true, eager);
task->set_running(true);
task->set_task_queue(&_task_queue);
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();
Expand Down

0 comments on commit 4a17d43

Please sign in to comment.