Skip to content

Commit

Permalink
[pipelineX](fix) Fix runtime filter dependency DCHECK failed (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Jan 17, 2024
1 parent e556cff commit 5f70c0b
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,16 @@ Status PipelineXTask::_open() {
_dry_run = _sink->should_dry_run(_state);
for (auto& o : _operators) {
auto* local_state = _state->get_local_state(o->operator_id());
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
RETURN_IF_ERROR(st);
} else if (i == 1) {
CHECK(false) << debug_string();
}
} else {
break;
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
RETURN_IF_ERROR(st);
}
} else {
RETURN_IF_ERROR(st);
}
}
RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state));
Expand Down

0 comments on commit 5f70c0b

Please sign in to comment.