diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index cebc51e2c7490c..6431a03beb2e04 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1163,7 +1163,7 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15"); // create tablet in partition random robin idx lru size, default 10000 DEFINE_Int32(partition_disk_index_lru_size, "10000"); // limit the storage space that query spill files can use -DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage"); +DEFINE_String(spill_storage_root_path, ""); DEFINE_String(spill_storage_limit, "20%"); // 20% DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s DEFINE_mInt32(spill_gc_file_count, "2000"); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 0f57a03fc64507..03ca1299b4372e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -251,7 +251,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; - _dependency->set_ready(); return; } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); @@ -325,7 +324,11 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } auto& mutable_block = _shared_state->partitioned_build_blocks[partition_index]; - DCHECK(mutable_block != nullptr); + if (!mutable_block) { + ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); + spilled_stream.reset(); + return Status::OK(); + } auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); @@ -340,11 +343,11 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti LOG(INFO) << "execution_context released, maybe query was cancelled."; return; } + SCOPED_ATTACH_TASK(state); _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_build_timer); Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); DCHECK_EQ(_spill_status_ok.load(), true); bool eos = false; @@ -654,7 +657,6 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (local_state._need_to_setup_internal_operators) { *eos = false; bool has_data = false; - CHECK_EQ(local_state._dependency->is_blocked_by(), nullptr); RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk( state, local_state._partition_cursor, has_data)); if (has_data) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d0ca832630e5fa..416d678b580dba 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -137,7 +137,17 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto execution_context = state->get_task_execution_context(); _dependency->block(); - auto spill_func = [execution_context, build_block, state, this]() { + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; + auto spill_func = [execution_context, build_block, state, query_id, mem_tracker, + this]() mutable { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + Defer defer {[&]() { + // need to reset build_block here, or else build_block will be destructed + // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure + build_block.reset(); + }}; + auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index fd63df92976788..48b1670ca1f891 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -137,10 +137,6 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); return _sort_sink_operator->open(state); } -Status SpillSortSinkOperatorX::close(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX::close(state)); - return _sort_sink_operator->close(state); -} Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) { if (!_enable_spill) { return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 4604696eff2011..d552d67570add3 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -79,7 +79,6 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequired_data_distribution(); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index d9828080ddce35..081d8ca1f590ed 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -42,6 +42,7 @@ #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" #include "util/uid_util.h" +#include "vec/spill/spill_stream_manager.h" namespace doris { @@ -167,6 +168,8 @@ QueryContext::~QueryContext() { file_scan_range_params_map.clear(); obj_pool.clear(); + _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id); + LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), mem_tracker_msg); } diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 2e75218e9cf491..e336c9f80a8711 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -183,7 +183,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { // in process_mem_used. // we count these cache memories equally on workload groups. double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; - if (ratio >= 1.25) { + if (ratio <= 1.25) { auto sys_mem_available = doris::MemInfo::sys_mem_available(); std::string debug_msg = fmt::format( "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index a218c7be6f18a7..731e09c6be9fc2 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -411,6 +411,9 @@ int main(int argc, char** argv) { } std::vector spill_paths; + if (doris::config::spill_storage_root_path.empty()) { + doris::config::spill_storage_root_path = doris::config::storage_root_path; + } olap_res = doris::parse_conf_store_paths(doris::config::spill_storage_root_path, &spill_paths); if (!olap_res) { LOG(ERROR) << "parse config spill storage path failed, path=" diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index f5b6fea096d9a5..ed7be9a0b28424 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -25,6 +25,7 @@ #include "io/fs/local_file_system.h" #include "runtime/exec_env.h" +#include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "vec/core/block.h" #include "vec/spill/spill_reader.h" @@ -88,6 +89,10 @@ void SpillStream::close() { } } +const TUniqueId& SpillStream::query_id() const { + return state_->query_id(); +} + const std::string& SpillStream::get_spill_root_dir() const { return data_dir_->path(); } diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 6b5166f2652f5b..68abfa9aaf7c0e 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -81,6 +81,8 @@ class SpillStream { read_wait_io_timer_ = wait_io_timer; } + const TUniqueId& query_id() const; + private: friend class SpillStreamManager; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 0259bef33f3fc8..05a2531c466492 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -36,6 +36,7 @@ #include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/time.h" +#include "util/uid_util.h" #include "vec/spill/spill_stream.h" namespace doris::vectorized { @@ -128,7 +129,7 @@ Status SpillStreamManager::_init_spill_store_map() { std::vector SpillStreamManager::_get_stores_for_spill( TStorageMedium::type storage_medium) { std::vector stores; - for (auto&& [_, store] : _spill_store_map) { + for (auto& [_, store] : _spill_store_map) { if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) { stores.push_back(store.get()); } @@ -188,7 +189,7 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea for (auto& dir : data_dirs) { data_dir = dir; std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(), SPILL_DIR_PREFIX); - spill_dir = fmt::format("{}/{}-{}-{}-{}-{}", spill_root_dir, query_id, operator_name, + spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, operator_name, node_id, state->task_id(), id); auto st = io::global_local_filesystem()->create_directory(spill_dir); if (!st.ok()) { @@ -207,9 +208,15 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea } void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) { - auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX, - std::filesystem::path(stream->get_spill_dir()).filename().string()); - (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); + auto query_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX, + print_id(stream->query_id())); + auto st = io::global_local_filesystem()->create_directory(query_dir); + if (st.ok()) { + auto gc_dir = + fmt::format("{}/{}", query_dir, + std::filesystem::path(stream->get_spill_dir()).filename().string()); + (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); + } } void SpillStreamManager::gc(int64_t max_file_count) { @@ -266,6 +273,22 @@ void SpillStreamManager::gc(int64_t max_file_count) { } } +void SpillStreamManager::async_cleanup_query(TUniqueId query_id) { + (void)get_async_task_thread_pool()->submit_func([this, query_id] { + for (auto& [_, store] : _spill_store_map) { + std::string query_spill_dir = + fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, print_id(query_id)); + bool exists = false; + auto status = io::global_local_filesystem()->exists(query_spill_dir, &exists); + if (status.ok() && exists) { + auto gc_dir = fmt::format("{}/{}/{}-gc", store->path(), SPILL_GC_DIR_PREFIX, + print_id(query_id)); + (void)io::global_local_filesystem()->rename(query_spill_dir, gc_dir); + } + } + }); +} + SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes, TStorageMedium::type storage_medium) : _path(std::move(path)), diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 2d7350f775fb28..36062ce0b466db 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -108,6 +108,8 @@ class SpillStreamManager { // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件 void delete_spill_stream(SpillStreamSPtr spill_stream); + void async_cleanup_query(TUniqueId query_id); + void gc(int64_t max_file_count); ThreadPool* get_spill_io_thread_pool(const std::string& path) const {