Skip to content

Commit

Permalink
update of cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 6, 2024
1 parent c2c1ecc commit cd31d15
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 13 deletions.
9 changes: 7 additions & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,13 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status
// for pending finish now. So that could call close directly.
Status status = task->close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
if (task->is_pipelineX()) { //should call fragment context cancel, in it will call query context cancel
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string(status.msg()));
} else {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
}
state = PipelineTaskState::CANCELED;
}
task->set_state(state);
Expand Down
22 changes: 16 additions & 6 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <sstream>

#include "olap/rowset/rowset_writer.h"
#include "runtime/query_context.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/network_util.h"
Expand Down Expand Up @@ -304,7 +305,7 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
return Status::OK();
}

Status LoadStreamStub::close_wait(int64_t timeout_ms) {
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
while (true) {
};
Expand All @@ -319,11 +320,20 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
if (!_is_closed.load()) {
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
if (ret != 0) {
return Status::InternalError(
"stream close_wait timeout, error={}, load_id={}, dst_id={}, stream_id={}", ret,
print_id(_load_id), _dst_id, _stream_id);
auto timeout_sec = timeout_ms / 1000;
while (!state->get_query_ctx()->is_cancelled() && timeout_sec > 0) {
//the query maybe cancel, so need check after wait 1s
timeout_sec = timeout_sec - 1;
int ret = _close_cv.wait_for(lock, 1000000);
if (ret == 0) {
break;
}
if (timeout_sec <= 0) {
return Status::InternalError(
"stream close_wait timeout, timeout_ms={}, load_id={}, dst_id={}, "
"stream_id={}",
timeout_ms, print_id(_load_id), _dst_id, _stream_id);
}
}
}
RETURN_IF_ERROR(_check_cancel());
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class LoadStreamStub {

// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(int64_t timeout_ms = 0);
Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);

// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
if (_writer_status.ok() && _eos) {
_writer_status = finish(state);
}
// should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout
if (_finish_dependency) {
_finish_dependency->set_ready();
}

Status close_st = close(_writer_status);
// If it is already failed before, then not update the write status so that we could get
Expand All @@ -171,9 +175,6 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
_writer_status = close_st;
}
_writer_thread_closed = true;
if (_finish_dependency) {
_finish_dependency->set_ready();
}
}

Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_block,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ Status VTabletWriterV2::close(Status exec_status) {
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
RETURN_IF_ERROR(stream->close_wait(remain_ms));
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ suite('nereids_insert_no_partition') {
sql 'set enable_nereids_dml=true'
sql 'set enable_strict_consistency_dml=true'

sql 'set experimental_enable_nereids_dml_with_pipeline=false'
explain {
// TODO: test turn off pipeline when dml, remove it if pipeline sink is ok
sql '''
Expand Down

0 comments on commit cd31d15

Please sign in to comment.