From 02c671bd755ca8722fdf3c835f47f7870571059a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 12 Oct 2024 14:13:51 +0800 Subject: [PATCH] [fix](pipeline) Prevent re-scheduling a task at the same time (#41743) ==40376==ERROR: AddressSanitizer: heap-use-after-free on address 0x616002419ed0 at pc 0x56132d2dc34a bp 0x7fe20150ba70 sp 0x7fe20150ba68 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27557_27557&logView=flowAware) WRITE of size 4 at 0x616002419ed0 thread T1474 (Pipe_normal [wo) [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27558_27558&logView=flowAware) #0 0x56132d2dc349 in doris::pipeline::PipelineTask::update_queue_level(int) /root/doris/be/src/pipeline/pipeline_task.h:174:67 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27559_27559&logView=flowAware) #1 0x56132d2dc349 in doris::pipeline::PriorityTaskQueue::_try_take_unprotected(bool) /root/doris/be/src/pipeline/task_queue.cpp:79:15 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27560_27560&logView=flowAware) #2 0x56132d2dc615 in doris::pipeline::PriorityTaskQueue::try_take(bool) /root/doris/be/src/pipeline/task_queue.cpp:97:12 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27561_27561&logView=flowAware) #3 0x56132d2de69f in doris::pipeline::MultiCoreTaskQueue::take(int) /root/doris/be/src/pipeline/task_queue.cpp:164:50 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27562_27562&logView=flowAware) #4 0x56132d2ee0e3 in doris::pipeline::TaskScheduler::_do_work(unsigned long) /root/doris/be/src/pipeline/task_scheduler.cpp:102:35 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27563_27563&logView=flowAware) #5 0x5612fc23029d in doris::ThreadPool::dispatch_thread() /root/doris/be/src/util/threadpool.cpp:543:24 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27564_27564&logView=flowAware) #6 0x5612fc20880e in std::function::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27565_27565&logView=flowAware) #7 0x5612fc20880e in doris::Thread::supervise_thread(void*) /root/doris/be/src/util/thread.cpp:498:5 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27566_27566&logView=flowAware) #8 0x7fe9405ca608 in start_thread /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27567_27567&logView=flowAware) #9 0x7fe940877132 in __clone /build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27568_27568&logView=flowAware) [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27569_27569&logView=flowAware) 0x616002419ed0 is located 80 bytes inside of 632-byte region [0x616002419e80,0x61600241a0f8) [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27570_27570&logView=flowAware) freed by thread T1463 (Pipe_normal [wo) here: [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27571_27571&logView=flowAware) #0 0x5612f7ac280d in operator delete(void*) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x2ed1180d) (BuildId: 04f75ee9f41a400a) [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27572_27572&logView=flowAware) #1 0x56132d2519d4 in std::default_delete::operator()(doris::pipeline::PipelineTask*) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:85:2 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27573_27573&logView=flowAware) #2 0x56132d2519d4 in std::unique_ptr >::~unique_ptr() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:361:4 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27574_27574&logView=flowAware) #3 0x56132d2519d4 in void std::destroy_at > >(std::unique_ptr >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27575_27575&logView=flowAware) #4 0x56132d2519d4 in void std::_Destroy > >(std::unique_ptr >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27576_27576&logView=flowAware) #5 0x56132d2519d4 in void std::_Destroy_aux::__destroy >*>(std::unique_ptr >*, std::unique_ptr >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27577_27577&logView=flowAware) #6 0x56132d2519d4 in void std::_Destroy >*>(std::unique_ptr >*, std::unique_ptr >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27578_27578&logView=flowAware) #7 0x56132d2519d4 in void std::_Destroy >*, std::unique_ptr > >(std::unique_ptr >*, std::unique_ptr >*, std::allocator > >&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27579_27579&logView=flowAware) #8 0x56132d2519d4 in std::vector >, std::allocator > > >::~vector() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:680:2 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27580_27580&logView=flowAware) #9 0x56132d1d717c in void std::destroy_at >, std::allocator > > > >(std::vector >, std::allocator > > >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27581_27581&logView=flowAware) #10 0x56132d1d717c in void std::_Destroy >, std::allocator > > > >(std::vector >, std::allocator > > >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27582_27582&logView=flowAware) #11 0x56132d1d717c in void std::_Destroy_aux::__destroy >, std::allocator > > >*>(std::vector >, std::allocator > > >*, std::vector >, std::allocator > > >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27583_27583&logView=flowAware) #12 0x56132d1d717c in void std::_Destroy >, std::allocator > > >*>(std::vector >, std::allocator > > >*, std::vector >, std::allocator > > >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27584_27584&logView=flowAware) #13 0x56132d1d717c in void std::_Destroy >, std::allocator > > >*, std::vector >, std::allocator > > > >(std::vector >, std::allocator > > >*, std::vector >, std::allocator > > >*, std::allocator >, std::allocator > > > >&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27585_27585&logView=flowAware) #14 0x56132d1d717c in std::vector >, std::allocator > > >, std::allocator >, std::allocator > > > > >::_M_erase_at_end(std::vector >, std::allocator > > >*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1796:6 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27586_27586&logView=flowAware) #15 0x56132d1d717c in std::vector >, std::allocator > > >, std::allocator >, std::allocator > > > > >::clear() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1499:9 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27587_27587&logView=flowAware) #16 0x56132d1d717c in doris::pipeline::PipelineFragmentContext::~PipelineFragmentContext() /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:143:12 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27588_27588&logView=flowAware) #17 0x5612f7aeabdc in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:168:6 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27589_27589&logView=flowAware) #18 0x56132d2ed705 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:702:11 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27590_27590&logView=flowAware) #19 0x56132d2ed705 in std::__shared_ptr::~__shared_ptr() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1149:31 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27591_27591&logView=flowAware) #20 0x56132d2ed705 in doris::pipeline::_close_task(doris::pipeline::PipelineTask*, doris::Status) /root/doris/be/src/pipeline/task_scheduler.cpp:98:1 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27592_27592&logView=flowAware) #21 0x56132d2efce9 in doris::pipeline::TaskScheduler::_do_work(unsigned long) /root/doris/be/src/pipeline/task_scheduler.cpp:181:17 [02:02:28 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_P0Regression/542356?expandBuildDeploymentsSection=false&hideTestsFromDependencies=false&hideProblemsFromDependencies=false&expandPull+Request+Details=true&expandBuildProblemsSection=true&expandBuildChangesSection=true&expandBuildTestsSection=true&showLog=542356_27593_27593&logView=flowAware) #22 0x5612fc23029d in doris::ThreadPool::dispatch_thread() /root/doris/be/src/util/threadpool.cpp:543:24 --- be/src/pipeline/pipeline_task.cpp | 20 ++++++++++++++++---- be/src/pipeline/pipeline_task.h | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 36f2066c9abed7..e06b8028c9c730 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -227,14 +227,20 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } for (auto* op_dep : _filter_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; @@ -249,7 +255,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. @@ -268,7 +277,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index a73393fb270885..e216ef103766fd 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -90,7 +90,7 @@ class PipelineTask { _blocked_dep = fin_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + return true; } } return false;