Skip to content

Commit

Permalink
Merge branch 'branch-2.0' of github.com:apache/doris into branch-2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Sep 13, 2023
2 parents 3e81d53 + 687a918 commit b8bcc2f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 28 deletions.
2 changes: 0 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time());
COUNTER_SET(_begin_execute_timer, _begin_execute_time);
COUNTER_SET(_eos_timer, _eos_time);
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
Expand Down Expand Up @@ -99,7 +98,6 @@ void PipelineTask::_init_profile() {
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ class PipelineTask {
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);

Expand Down Expand Up @@ -310,8 +308,6 @@ class PipelineTask {
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer;
// TODO we should calculate the time between when really runnable and runnable
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;

Expand Down
31 changes: 12 additions & 19 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ void BlockedTaskScheduler::_schedule() {
_started.store(true);
std::list<PipelineTask*> local_blocked_tasks;
int empty_times = 0;
std::vector<PipelineTask*> ready_tasks;

while (!_shutdown) {
{
Expand All @@ -104,6 +103,7 @@ void BlockedTaskScheduler::_schedule() {
}
}

auto origin_local_block_tasks_size = local_blocked_tasks.size();
auto iter = local_blocked_tasks.begin();
vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time();
while (iter != local_blocked_tasks.end()) {
Expand All @@ -114,15 +114,14 @@ void BlockedTaskScheduler::_schedule() {
if (task->is_pending_finish()) {
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks,
PipelineTaskState::PENDING_FINISH);
_make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
}
} else if (task->fragment_context()->is_canceled()) {
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id)
Expand All @@ -135,47 +134,43 @@ void BlockedTaskScheduler::_schedule() {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
if (task->runtime_filters_are_ready_or_timeout()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
if (task->sink_can_write()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else {
// TODO: DCHECK the state
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
}

if (ready_tasks.empty()) {
if (origin_local_block_tasks_size == 0 ||
local_blocked_tasks.size() == origin_local_block_tasks_size) {
empty_times += 1;
} else {
empty_times = 0;
for (auto& task : ready_tasks) {
task->stop_schedule_watcher();
_task_queue->push_back(task);
}
ready_tasks.clear();
}

if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) {
Expand All @@ -195,13 +190,11 @@ void BlockedTaskScheduler::_schedule() {

void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState t_state) {
auto task = *task_itr;
task->start_schedule_watcher();
task->set_state(t_state);
local_tasks.erase(task_itr++);
ready_tasks.emplace_back(task);
_task_queue->push_back(task);
}

TaskScheduler::~TaskScheduler() {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class BlockedTaskScheduler {
void _schedule();
void _make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState state = PipelineTaskState::RUNNABLE);
};

Expand Down
37 changes: 35 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ Status HashJoinNode::close(RuntimeState* state) {

bool HashJoinNode::need_more_input_data() const {
return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos &&
!_short_circuit_for_probe;
(!_short_circuit_for_probe || _is_mark_join);
}

void HashJoinNode::prepare_for_next() {
Expand All @@ -530,10 +530,43 @@ void HashJoinNode::prepare_for_next() {
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
/// If `_short_circuit_for_probe` is true, this indicates no rows
/// match the join condition, and this is 'mark join', so we need to create a column as mark
/// with all rows set to 0.
if (_is_mark_join) {
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
*eos = _probe_eos;
return Status::OK();
}

Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(_probe_block.get_by_position(i));
}
}
auto mark_column = ColumnUInt8::create(block_rows, 0);
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});

{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns()));
}

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
temp_block.clear();
release_block_memory(_probe_block);
reached_limit(output_block, eos);
return Status::OK();
}
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (_short_circuit_for_probe_and_additional_data) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
Expand Down Expand Up @@ -720,7 +753,7 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc
Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (_short_circuit_for_probe) {
if (_short_circuit_for_probe && !_is_mark_join) {
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@
24 4
3 3

-- !in_subquery_mark_with_order --
1 \N
1 2
1 3
2 4
2 5
3 3
3 4

-- !exists_subquery_with_order --
1 2
1 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ suite ("sub_query_correlated") {
DROP TABLE IF EXISTS `sub_query_correlated_subquery5`
"""

sql """
DROP TABLE IF EXISTS `sub_query_correlated_subquery6`
"""

sql """
DROP TABLE IF EXISTS `sub_query_correlated_subquery7`
"""

sql """
create table if not exists sub_query_correlated_subquery1
(k1 bigint, k2 bigint)
Expand Down Expand Up @@ -82,6 +90,21 @@ suite ("sub_query_correlated") {
properties('replication_num' = '1')
"""

sql """
create table if not exists sub_query_correlated_subquery6
(k1 bigint, k2 bigint)
duplicate key(k1)
distributed by hash(k2) buckets 1
properties('replication_num' = '1')
"""

sql """
create table if not exists sub_query_correlated_subquery7
(k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint)
distributed by hash(k2) buckets 1
properties('replication_num' = '1');
"""

sql """
insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4)
"""
Expand All @@ -103,6 +126,15 @@ suite ("sub_query_correlated") {
insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), (5,4), (6,7), (8,9)
"""

sql """
insert into sub_query_correlated_subquery6 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
"""

sql """
insert into sub_query_correlated_subquery7 values (1,"abc",2,3,4), (1,"abcd",3,3,4), (2,"xyz",2,4,2),
(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null);
"""

sql "SET enable_fallback_to_original_planner=false"

//------------------Correlated-----------------
Expand Down Expand Up @@ -261,6 +293,10 @@ suite ("sub_query_correlated") {
select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2);
"""

order_qt_in_subquery_mark_with_order """
select * from sub_query_correlated_subquery6 where sub_query_correlated_subquery6.k1 not in (select sub_query_correlated_subquery7.k3 from sub_query_correlated_subquery7 ) or k1 < 10;
"""

order_qt_exists_subquery_with_order """
select * from sub_query_correlated_subquery1 where exists (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2);
"""
Expand Down

0 comments on commit b8bcc2f

Please sign in to comment.