diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 9ad0ff1b57f7c0..a7198a97da41ed 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -392,9 +392,11 @@ void SpillSortSharedState::close() { } MultiCastSharedState::MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, - int cast_sender_count) + int cast_sender_count, int node_id) : multi_cast_data_streamer(std::make_unique( - row_desc, pool, cast_sender_count, true)) {} + row_desc, this, pool, cast_sender_count, node_id, true)) {} + +void MultiCastSharedState::update_spill_stream_profiles(RuntimeProfile* source_profile) {} int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) { auto ctxs = evaluator->input_exprs_ctxs(); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index c9a16a8861472d..1d79331096c9fa 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -570,10 +570,14 @@ struct CacheSharedState : public BasicSharedState { class MultiCastDataStreamer; -struct MultiCastSharedState : public BasicSharedState { -public: - MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count); +struct MultiCastSharedState : public BasicSharedState, + public BasicSpillSharedState, + public std::enable_shared_from_this { + MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count, + int node_id); std::unique_ptr multi_cast_data_streamer; + + void update_spill_stream_profiles(RuntimeProfile* source_profile) override; }; struct BlockRowPos { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index eb72e9601e1acf..4af54ec221cf2e 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -23,9 +23,9 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" std::string MultiCastDataStreamSinkLocalState::name_suffix() { - auto& sinks = static_cast(_parent)->sink_node().sinks; + const auto& sinks = static_cast(_parent)->sink_node().sinks; std::string id_name = " (dst id : "; - for (auto& sink : sinks) { + for (const auto& sink : sinks) { id_name += std::to_string(sink.dest_node_id) + ","; } id_name += ")"; @@ -34,19 +34,39 @@ std::string MultiCastDataStreamSinkLocalState::name_suffix() { std::shared_ptr MultiCastDataStreamSinkOperatorX::create_shared_state() const { std::shared_ptr ss = - std::make_shared(_row_desc, _pool, _cast_sender_count); + std::make_shared(_row_desc, _pool, _cast_sender_count, _node_id); ss->id = operator_id(); - for (auto& dest : dests_id()) { + for (const auto& dest : dests_id()) { ss->related_op_ids.insert(dest); } return ss; } +std::vector MultiCastDataStreamSinkLocalState::dependencies() const { + auto dependencies = Base::dependencies(); + dependencies.emplace_back(_shared_state->multi_cast_data_streamer->get_spill_dependency()); + return dependencies; +} + +Status MultiCastDataStreamSinkLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); + _shared_state->multi_cast_data_streamer->set_sink_profile(profile()); + _shared_state->setup_shared_profile(profile()); + _shared_state->multi_cast_data_streamer->set_write_dependency(_dependency); + return Status::OK(); +} + +std::string MultiCastDataStreamSinkLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, ", Base::debug_string(indentation_level), + _shared_state->multi_cast_data_streamer->debug_string()); + return fmt::to_string(debug_string_buffer); +} + Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0 || eos) { COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); auto st = local_state._shared_state->multi_cast_data_streamer->push(state, in_block, eos); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index 57b5974064b6a2..e0c454d8f1090c 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -17,22 +17,32 @@ #pragma once +#include + +#include "common/status.h" #include "operator.h" +#include "pipeline/exec/data_queue.h" namespace doris::pipeline { class MultiCastDataStreamSinkOperatorX; class MultiCastDataStreamSinkLocalState final - : public PipelineXSinkLocalState { + : public PipelineXSpillSinkLocalState { ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} friend class MultiCastDataStreamSinkOperatorX; friend class DataSinkOperatorX; - using Base = PipelineXSinkLocalState; + using Base = PipelineXSpillSinkLocalState; using Parent = MultiCastDataStreamSinkOperatorX; std::string name_suffix() override; + Status open(RuntimeState* state) override; + + std::vector dependencies() const override; + + std::string debug_string(int indentation_level) const override; + private: std::shared_ptr _multi_cast_data_streamer; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index e45e59d17e27b3..61adfed7573bb5 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -38,6 +38,8 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); + _shared_state->multi_cast_data_streamer->set_source_profile(p._consumer_id, + _runtime_profile.get()); _shared_state->multi_cast_data_streamer->set_dep_by_sender_idx(p._consumer_id, _dependency); _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); _filter_timer = ADD_TIMER(_runtime_profile, "FilterTime"); @@ -50,6 +52,14 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState return Status::OK(); } +std::vector MultiCastDataStreamSourceLocalState::dependencies() const { + auto dependencies = Base::dependencies(); + auto& p = _parent->cast(); + dependencies.emplace_back( + _shared_state->multi_cast_data_streamer->get_spill_read_dependency(p._consumer_id)); + return dependencies; +} + Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); @@ -92,9 +102,9 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, { SCOPED_TIMER(local_state._get_data_timer); RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull( - _consumer_id, output_block, eos)); + state, _consumer_id, output_block, eos)); } - if (!local_state._conjuncts.empty()) { + if (!local_state._conjuncts.empty() && !output_block->empty()) { SCOPED_TIMER(local_state._filter_timer); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 57410bf8d9568a..c1af8c5b21cd33 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -37,11 +37,12 @@ namespace pipeline { class MultiCastDataStreamer; class MultiCastDataStreamerSourceOperatorX; -class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState, - public RuntimeFilterConsumer { +class MultiCastDataStreamSourceLocalState final + : public PipelineXSpillLocalState, + public RuntimeFilterConsumer { public: ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); - using Base = PipelineXLocalState; + using Base = PipelineXSpillLocalState; using Parent = MultiCastDataStreamerSourceOperatorX; MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; @@ -62,6 +63,8 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState dependencies() const override; + private: friend class MultiCastDataStreamerSourceOperatorX; vectorized::VExprContextSPtrs _output_expr_contexts; diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 3e629093e23b97..f1e399a32895ae 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -17,9 +17,26 @@ #include "multi_cast_data_streamer.h" +#include +#include + +#include +#include +#include + +#include "common/config.h" +#include "common/exception.h" +#include "common/logging.h" +#include "common/status.h" #include "pipeline/dependency.h" #include "pipeline/exec/multi_cast_data_stream_source.h" +#include "pipeline/exec/spill_utils.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "util/pretty_printer.h" +#include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { #include "common/compile_check_begin.h" @@ -30,37 +47,115 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_ block->clear(); } -Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, + bool* eos) { int* un_finish_copy = nullptr; int use_count = 0; + size_t mem_size = 0; + bool spilled = false; { std::lock_guard l(_mutex); + + if (!_cached_blocks[sender_idx].empty()) { + *block = std::move(_cached_blocks[sender_idx].front()); + _cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin()); + return Status::OK(); + } + + for (auto it = _spill_readers[sender_idx].begin(); + it != _spill_readers[sender_idx].end();) { + if ((*it)->all_data_read) { + it = _spill_readers[sender_idx].erase(it); + } else { + it++; + } + } + + if (!_spill_readers[sender_idx].empty()) { + auto reader_item = _spill_readers[sender_idx].front(); + if (!reader_item->stream->ready_for_reading()) { + return Status::OK(); + } + + auto& reader = reader_item->reader; + RETURN_IF_ERROR(reader->open()); + if (reader_item->block_offset != 0) { + reader->seek(reader_item->block_offset); + reader_item->block_offset = 0; + } + + auto spill_func = [this, reader_item, sender_idx]() { + vectorized::Block block; + bool spill_eos = false; + size_t read_size = 0; + while (!spill_eos) { + RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos)); + if (!block.empty()) { + std::lock_guard l(_mutex); + read_size += block.allocated_bytes(); + _cached_blocks[sender_idx].emplace_back(std::move(block)); + if (_cached_blocks[sender_idx].size() >= 32 || + read_size > 2 * 1024 * 1024) { + break; + } + } + } + + if (spill_eos || !_cached_blocks[sender_idx].empty()) { + reader_item->all_data_read = spill_eos; + _set_ready_for_read(sender_idx); + } + return Status::OK(); + }; + + auto catch_exception_func = [spill_func = std::move(spill_func)]() { + RETURN_IF_CATCH_EXCEPTION(return spill_func();); + }; + + _spill_read_dependencies[sender_idx]->block(); + auto spill_runnable = std::make_shared( + state, _spill_read_dependencies[sender_idx], _source_profiles[sender_idx], + _shared_state->shared_from_this(), catch_exception_func); + auto* thread_pool = + ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); + RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable))); + return Status::OK(); + } + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; const auto end = _multi_cast_blocks.end(); - DCHECK(pos_to_pull != end); + if (pos_to_pull == end) { + _block_reading(sender_idx); + VLOG_DEBUG << "query: " << print_id(state->query_id()) + << ", pos_to_pull end: " << (void*)(_write_dependency); + *eos = _eos; + return Status::OK(); + } *block = *pos_to_pull->_block; - _cumulative_mem_size -= pos_to_pull->_mem_size; - pos_to_pull->_used_count--; use_count = pos_to_pull->_used_count; + mem_size = pos_to_pull->_mem_size; un_finish_copy = &pos_to_pull->_un_finish_copy; pos_to_pull++; if (pos_to_pull == end) { _block_reading(sender_idx); + *eos = _eos; + RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); } - *eos = _eos and pos_to_pull == end; - } - - if (use_count == 0) { - // will clear _multi_cast_blocks - _wait_copy_block(block, *un_finish_copy); - } else { - _copy_block(block, *un_finish_copy); + if (use_count == 0) { + _cumulative_mem_size.fetch_sub(mem_size); + _multi_cast_blocks.pop_front(); + _write_dependency->set_ready(); + VLOG_DEBUG << "**** query: " << print_id(state->query_id()) + << ", set ready: " << (void*)(_write_dependency); + } else { + _copy_block(block, *un_finish_copy); + } } return Status::OK(); @@ -71,13 +166,6 @@ void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish for (int i = 0; i < block->columns(); ++i) { block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); } - - std::unique_lock l(_mutex); - un_finish_copy--; - if (un_finish_copy == 0) { - l.unlock(); - _cv.notify_one(); - } } void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) { @@ -86,16 +174,153 @@ void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_f _multi_cast_blocks.pop_front(); } +Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool* triggered) { + vectorized::SpillStreamSPtr spill_stream; + *triggered = false; + if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes && + _multi_cast_blocks.size() >= 4) { + _write_dependency->block(); + + bool has_reached_end = false; + std::vector distances(_cast_sender_count); + size_t total_count = _multi_cast_blocks.size(); + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + distances[i] = std::distance(_multi_cast_blocks.begin(), _sender_pos_to_read[i]); + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + has_reached_end = true; + CHECK_EQ(distances[i], total_count); + } + + if (!_spill_readers[i].empty()) { + CHECK_EQ(distances[i], 0); + } + } + + if (has_reached_end) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + state, spill_stream, print_id(state->query_id()), "MultiCastSender", _node_id, + std::numeric_limits::max(), std::numeric_limits::max(), + _sink_profile)); + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (distances[i] < total_count) { + auto reader = spill_stream->create_separate_reader(); + reader->set_counters(_source_profiles[i]); + auto reader_item = std::make_shared( + std::move(reader), spill_stream, distances[i], false); + _spill_readers[i].emplace_back(std::move(reader_item)); + } + + _block_reading(i); + } + + RETURN_IF_ERROR(_submit_spill_task(state, spill_stream)); + DCHECK_EQ(_multi_cast_blocks.size(), 0); + + for (auto& pos : _sender_pos_to_read) { + pos = _multi_cast_blocks.end(); + } + _cumulative_mem_size = 0; + *triggered = true; + } + } + + return Status::OK(); +} + +Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state, + vectorized::SpillStreamSPtr spill_stream) { + std::vector blocks; + for (auto& block : _multi_cast_blocks) { + blocks.emplace_back(std::move(*block._block)); + } + + _multi_cast_blocks.clear(); + + auto spill_func = [state, blocks = std::move(blocks), + spill_stream = std::move(spill_stream)]() mutable { + const auto blocks_count = blocks.size(); + while (!blocks.empty() && !state->is_cancelled()) { + auto block = std::move(blocks.front()); + blocks.erase(blocks.begin()); + + RETURN_IF_ERROR(spill_stream->spill_block(state, block, false)); + } + VLOG_DEBUG << "query: " << print_id(state->query_id()) << " multi cast write " + << blocks_count << " blocks"; + return spill_stream->spill_eof(); + }; + + auto exception_catch_func = [spill_func = std::move(spill_func), + query_id = print_id(state->query_id()), this]() mutable { + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); }(); + _write_dependency->set_ready(); + + if (!status.ok()) { + LOG(WARNING) << "query: " << query_id + << " multi cast write failed: " << status.to_string() + << ", dependency: " << (void*)_spill_dependency.get(); + } else { + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + _set_ready_for_read(i); + } + } + return status; + }; + + auto spill_runnable = std::make_shared( + state, nullptr, _spill_dependency, _sink_profile, _shared_state->shared_from_this(), + exception_catch_func); + + _spill_dependency->block(); + + auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); + return thread_pool->submit(std::move(spill_runnable)); +} + Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { auto rows = block->rows(); COUNTER_UPDATE(_process_rows, rows); const auto block_mem_size = block->allocated_bytes(); - _cumulative_mem_size += block_mem_size; - COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value())); + + if (!_shared_state->_spill_status.ok()) { + return _shared_state->_spill_status.status(); + } { std::lock_guard l(_mutex); + + if (_pending_block) { + const auto pending_size = _pending_block->allocated_bytes(); + _cumulative_mem_size += pending_size; + _multi_cast_blocks.emplace_back(_pending_block.get(), _cast_sender_count, + _cast_sender_count - 1, pending_size); + _pending_block.reset(); + + auto end = std::prev(_multi_cast_blocks.end()); + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + _sender_pos_to_read[i] = end; + _set_ready_for_read(i); + } + } + } + + _cumulative_mem_size += block_mem_size; + COUNTER_SET(_peak_mem_usage, + std::max(_cumulative_mem_size.load(), _peak_mem_usage->value())); + + if (!eos) { + bool spilled = false; + RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); + if (spilled) { + _pending_block = + vectorized::Block::create_unique(block->get_columns_with_type_and_name()); + block->clear(); + return Status::OK(); + } + } + _multi_cast_blocks.emplace_back(block, _cast_sender_count, _cast_sender_count - 1, block_mem_size); // last elem @@ -106,6 +331,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block _set_ready_for_read(i); } } + _eos = eos; } @@ -135,4 +361,11 @@ void MultiCastDataStreamer::_block_reading(int sender_idx) { dep->block(); } +std::string MultiCastDataStreamer::debug_string() const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "MemSize: {}", + PrettyPrinter::print_bytes(_cumulative_mem_size)); + return fmt::to_string(debug_string_buffer); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 07e64016363f65..079acb9c81f9eb 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -17,11 +17,22 @@ #pragma once +#include +#include +#include +#include + +#include "pipeline/dependency.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" #include "vec/sink/vdata_stream_sender.h" +#include "vec/spill/spill_stream.h" namespace doris::pipeline { class Dependency; +struct MultiCastSharedState; + struct MultiCastBlock { MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, size_t mem_size); @@ -31,30 +42,53 @@ struct MultiCastBlock { size_t _mem_size; }; +struct SpillingReader { + vectorized::SpillReaderUPtr reader; + vectorized::SpillStreamSPtr stream; + int64_t block_offset {0}; + bool all_data_read {false}; +}; + // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and refactor the // code class MultiCastDataStreamer { public: - MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count, + MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState* shared_state, + ObjectPool* pool, int cast_sender_count, int32_t node_id, bool with_dependencies = false) : _row_desc(row_desc), + _shared_state(shared_state), _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), - _cast_sender_count(cast_sender_count) { + _cached_blocks(cast_sender_count), + _cast_sender_count(cast_sender_count), + _node_id(node_id), + _spill_readers(cast_sender_count), + _source_profiles(cast_sender_count) { _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); if (with_dependencies) { _dependencies.resize(cast_sender_count, nullptr); } + _spill_dependency = Dependency::create_shared(_node_id, _node_id, + "MultiCastDataStreamerDependency", true); + + for (int i = 0; i != cast_sender_count; ++i) { + _spill_read_dependencies.emplace_back(Dependency::create_shared( + node_id, node_id, "MultiCastReadSpillDependency", true)); + } _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); }; - ~MultiCastDataStreamer() = default; + ~MultiCastDataStreamer() { + for (auto& item : _spill_readers) { + DCHECK(item.empty()); + } + } - Status pull(int sender_idx, vectorized::Block* block, bool* eos); + Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block, bool* eos); Status push(RuntimeState* state, vectorized::Block* block, bool eos); - const RowDescriptor& row_desc() { return _row_desc; } RuntimeProfile* profile() { return _profile; } @@ -64,6 +98,22 @@ class MultiCastDataStreamer { _block_reading(sender_idx); } + void set_write_dependency(Dependency* dependency) { _write_dependency = dependency; } + + Dependency* get_spill_dependency() const { return _spill_dependency.get(); } + + Dependency* get_spill_read_dependency(int sender_idx) const { + return _spill_read_dependencies[sender_idx].get(); + } + + void set_sink_profile(RuntimeProfile* profile) { _sink_profile = profile; } + + void set_source_profile(int sender_idx, RuntimeProfile* profile) { + _source_profiles[sender_idx] = profile; + } + + std::string debug_string() const; + private: void _set_ready_for_read(int sender_idx); void _block_reading(int sender_idx); @@ -72,19 +122,37 @@ class MultiCastDataStreamer { void _wait_copy_block(vectorized::Block* block, int& un_finish_copy); + Status _submit_spill_task(RuntimeState* state, vectorized::SpillStreamSPtr spill_stream); + + Status _trigger_spill_if_need(RuntimeState* state, bool* triggered); + const RowDescriptor& _row_desc; + MultiCastSharedState* _shared_state; RuntimeProfile* _profile = nullptr; std::list _multi_cast_blocks; + std::list _spilling_blocks; + std::vector> _cached_blocks; std::vector::iterator> _sender_pos_to_read; std::condition_variable _cv; std::mutex _mutex; bool _eos = false; int _cast_sender_count = 0; - int64_t _cumulative_mem_size = 0; - + int _node_id; + std::atomic_int64_t _cumulative_mem_size = 0; RuntimeProfile::Counter* _process_rows = nullptr; RuntimeProfile::Counter* _peak_mem_usage = nullptr; + Dependency* _write_dependency; std::vector _dependencies; + std::shared_ptr _spill_dependency; + + vectorized::BlockUPtr _pending_block; + + std::vector>> _spill_readers; + + std::vector> _spill_read_dependencies; + + RuntimeProfile* _sink_profile; + std::vector _source_profiles; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a8c2b4953651eb..6f2f7c8bc15d9b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -21,6 +21,7 @@ #include #include +#include #include "common/logging.h" #include "pipeline/exec/operator.h" @@ -332,9 +333,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( if (spill_context) { spill_context->on_task_finished(); } + + std::lock_guard lock(_spill_mutex); _spill_dependency->set_ready(); return status; }; + for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; @@ -390,9 +394,15 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( } } - if (_spilling_task_count > 0) { - _spill_dependency->block(); - } else if (_child_eos) { + if (_spilling_task_count.load() > 0) { + std::lock_guard lock(_spill_mutex); + if (_spilling_task_count.load() > 0) { + _spill_dependency->block(); + return Status::OK(); + } + } + + if (_child_eos) { VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join sink " << _parent->node_id() << " set_ready_to_read" << ", task id: " << state->task_id(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 58b19004f337b0..374055a838f000 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -68,6 +68,7 @@ class PartitionedHashJoinSinkLocalState friend class PartitionedHashJoinSinkOperatorX; + std::mutex _spill_mutex; std::atomic _spilling_finished {false}; std::atomic_int32_t _spilling_task_count {0}; diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index c947081fcaf405..014b83be23d636 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -17,6 +17,8 @@ #include "vec/spill/spill_reader.h" +#include + #include #include "common/cast_set.h" @@ -99,6 +101,11 @@ Status SpillReader::open() { return Status::OK(); } +void SpillReader::seek(size_t block_index) { + DCHECK_LT(block_index, block_count_); + read_block_index_ = block_index; +} + Status SpillReader::read(Block* block, bool* eos) { DCHECK(file_reader_); block->clear_column_data(); diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index 3e5b93a21d76b9..a27916a87a3c89 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -115,6 +115,10 @@ Status SpillStream::prepare() { return writer_->open(); } +SpillReaderUPtr SpillStream::create_separate_reader() const { + return std::make_unique(stream_id_, writer_->get_file_path()); +} + const TUniqueId& SpillStream::query_id() const { return query_id_; } @@ -144,6 +148,10 @@ Status SpillStream::spill_eof() { auto status = writer_->close(); total_written_bytes_ = writer_->get_written_bytes(); writer_.reset(); + + if (status.ok()) { + _ready_for_reading = true; + } return status; } diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 9682130aad0aae..525abbb7855d26 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -67,8 +67,12 @@ class SpillStream { void update_shared_profiles(RuntimeProfile* source_op_profile); + SpillReaderUPtr create_separate_reader() const; + const TUniqueId& query_id() const; + bool ready_for_reading() const { return _ready_for_reading; } + private: friend class SpillStreamManager; @@ -86,6 +90,7 @@ class SpillStream { size_t batch_bytes_; int64_t total_written_bytes_ = 0; + std::atomic_bool _ready_for_reading = false; std::atomic_bool _is_reading = false; SpillWriterUPtr writer_;