From d46495ebf72557fd3cecdffb1bd9870131acd60c Mon Sep 17 00:00:00 2001 From: TengJianPing Date: Fri, 27 Dec 2024 14:41:31 +0800 Subject: [PATCH] Add spill metrics and improve spill log printing (#46029) --- be/src/olap/memtable_flush_executor.cpp | 3 +- .../partitioned_aggregation_sink_operator.cpp | 25 ++--- ...artitioned_aggregation_source_operator.cpp | 32 +++--- .../partitioned_hash_join_probe_operator.cpp | 98 ++++++++++--------- .../partitioned_hash_join_sink_operator.cpp | 50 +++++----- .../exec/spill_sort_sink_operator.cpp | 15 +-- .../exec/spill_sort_source_operator.cpp | 23 +++-- be/src/vec/exec/scan/scanner_scheduler.cpp | 8 +- be/src/vec/spill/spill_reader.cpp | 11 ++- be/src/vec/spill/spill_stream_manager.cpp | 21 ++++ be/src/vec/spill/spill_stream_manager.h | 22 +++++ be/src/vec/spill/spill_writer.cpp | 2 + 12 files changed, 186 insertions(+), 124 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 5533a360fac373..9648c3fe098194 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -161,7 +161,8 @@ Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context, if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) { // If there are already any flushing task, Wait for some time and retry. LOG_EVERY_T(INFO, 60) << fmt::format( - "Failed to reserve memory {} for flush memtable, retry after 100ms", size); + "Failed to reserve memory {} for flush memtable, retry after 100ms", + PrettyPrinter::print_bytes(size)); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } else { st = Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 8cc6ae58a4fb29..ad7f4e6f1847eb 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -186,9 +186,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: revocable_size = revocable_mem_size(state); query_mem_limit = state->get_query_ctx()->get_mem_limit(); LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} eos, need spill: {}, query mem limit: {}, " - "revocable memory: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, " + "revocable memory:{}", + print_id(state->query_id()), node_id(), state->task_id(), local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit), PrettyPrinter::print_bytes(revocable_size)); @@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr& spill_context) { const auto size_to_revoke = _parent->revocable_mem_size(state); LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need spill: {}, revocable " - "memory: {}", - print_id(state->query_id()), state->task_id(), _parent->node_id(), _eos, + "Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need spill:{}, revocable " + "memory:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos, _shared_state->is_spilled, PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); if (!_shared_state->is_spilled) { @@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory( Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " agg node " - << Base::_parent->node_id() - << " revoke_memory error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, agg sink:{}, task:{}, revoke_memory error:{}", + print_id(query_id), Base::_parent->node_id(), state->task_id(), + status); } _shared_state->close(); } else { LOG(INFO) << fmt::format( - "Query: {}, task {}, agg sink {} revoke_memory finish, eos: {}, " - "revocable memory: {}", - print_id(state->query_id()), state->task_id(), _parent->node_id(), + "Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, " + "revocable memory:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos, PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 8e221a1c7e2341..c87ee24dedb222 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -250,8 +250,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " agg node " - << _parent->node_id() << " recover agg data error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover agg data error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); } @@ -305,15 +306,16 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b } } - VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() - << ", task id: " << state->task_id() << " recover partitioned finished, " - << _shared_state->spill_partitions.size() << " partitions left, " - << accumulated_blocks_size - << " bytes read, spill dep: " << (void*)(_spill_dependency.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover partitioned finished, partitions " + "left:{}, bytes read:{}, spill dep:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->spill_partitions.size(), accumulated_blocks_size, + (void*)(_spill_dependency.get())); return status; }; - auto exception_catch_func = [spill_func, query_id]() { + auto exception_catch_func = [this, state, spill_func, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_source " @@ -323,8 +325,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id) - << " recover exception : " << status.to_string(); + LOG_IF(INFO, !status.ok()) << fmt::format( + "Query:{}, agg probe:{}, task:{}, recover exception:{}", print_id(query_id), + _parent->node_id(), state->task_id(), status.to_string()); return status; }; @@ -334,10 +337,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); _spill_dependency->block(); - VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() - << ", task id: " << state->task_id() << " begin to recover, " - << _shared_state->spill_partitions.size() - << " partitions left, _spill_dependency: " << (void*)(_spill_dependency.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, agg probe:{}, task:{}, begin to recover, partitions left:{}, " + "_spill_dependency:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->spill_partitions.size(), (void*)(_spill_dependency.get())); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::make_shared(state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 04b83e822c114b..ff9c78c5be496b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -230,9 +230,11 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat } COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size)); - VLOG_DEBUG << "Query: " << print_id(query_id) - << " hash probe revoke done, node: " << p.node_id() - << ", task: " << state->task_id(); + + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " spill_probe_blocks done", + print_id(query_id), p.node_id(), state->task_id()); return Status::OK(); }; @@ -275,9 +277,10 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recover_build_blocks_from_disk"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recover_build_blocks_from_disk", + print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index); auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -291,9 +294,10 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim SCOPED_TIMER(_recovery_build_timer); bool eos = false; - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", recoverying build data"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recoverying build data", + print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index); Status status; while (!eos) { vectorized::Block block; @@ -315,7 +319,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } if (UNLIKELY(state->is_cancelled())) { - LOG(INFO) << "recovery build block when canceled."; + LOG(INFO) << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery build data canceled", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + partition_index); break; } @@ -338,9 +346,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim if (eos) { ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); _shared_state->spilled_streams[partition_index].reset(); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) - << ", node: " << _parent->node_id() << ", task id: " << state->task_id() - << ", partition: " << partition_index; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery build data eos", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + partition_index); } return status; }; @@ -365,16 +375,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _spill_dependency->block(); - { - auto* pipeline_task = state->get_task(); - if (pipeline_task) { - auto& p = _parent->cast(); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << p.node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", dependency: " << _dependency - << ", task debug_string: " << pipeline_task->debug_string(); - } - } DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func", { @@ -386,9 +386,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto spill_runnable = std::make_shared( state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), exception_catch_func); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recover_build_blocks_from_disk submit func"; return spill_io_pool->submit(std::move(spill_runnable)); } @@ -429,7 +426,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim auto query_id = state->query_id(); - auto read_func = [this, query_id, &spilled_stream, &blocks] { + auto read_func = [this, query_id, partition_index, &spilled_stream, &blocks] { SCOPED_TIMER(_recovery_probe_timer); vectorized::Block block; @@ -457,8 +454,10 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim } } if (eos) { - VLOG_DEBUG << "Query: " << print_id(query_id) - << ", recovery probe data done: " << spilled_stream->get_spill_dir(); + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, recovery probe data done", + print_id(query_id), _parent->node_id(), _state->task_id(), partition_index); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); } @@ -675,13 +674,13 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(), &block, true)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) - << ", internal build operator finished, node id: " << node_id() - << ", task id: " << state->task_id() - << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() - << ", usage: " - << _inner_sink_operator->get_memory_usage( - local_state._shared_state->inner_runtime_state.get()); + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " internal build operator finished, partition:{}, rows:{}, memory usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), local_state._partition_cursor, + block.rows(), + _inner_sink_operator->get_memory_usage( + local_state._shared_state->inner_runtime_state.get())); COUNTER_SET(local_state._hash_table_memory_usage, sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value()); @@ -734,9 +733,10 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (!has_data) { vectorized::Block block; RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() - << ", task: " << state->task_id() << "partition: " << partition_index - << " has no data to recovery"; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, has no data to recovery", + print_id(state->query_id()), node_id(), state->task_id(), partition_index); break; } else { return Status::OK(); @@ -755,9 +755,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, *eos = false; if (in_mem_eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() - << ", task: " << state->task_id() - << ", partition: " << local_state._partition_cursor; + VLOG_DEBUG << fmt::format( + "Query:{}, hash join probe:{}, task:{}," + " partition:{}, probe done", + print_id(state->query_id()), node_id(), state->task_id(), + local_state._partition_cursor); local_state._partition_cursor++; if (local_state._partition_cursor == _partition_count) { *eos = true; @@ -848,8 +850,8 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() - << ", task: " << state->task_id(); + VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, revoke_memory", + print_id(state->query_id()), node_id(), state->task_id()); RETURN_IF_ERROR(local_state.spill_probe_blocks(state)); return Status::OK(); @@ -894,10 +896,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori #ifndef NDEBUG Defer eos_check_defer([&] { if (*eos) { - LOG(INFO) << "Query: " << print_id(state->query_id()) - << ", hash probe node: " << node_id() << ", task: " << state->task_id() - << ", eos with child eos: " << local_state._child_eos - << ", need spill: " << need_to_spill; + LOG(INFO) << fmt::format( + "Query:{}, hash join probe:{}, task:{}, child eos:{}, need spill:{}", + print_id(state->query_id()), node_id(), state->task_id(), + local_state._child_eos, need_to_spill); } }); #endif diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 2e2c38f04c32ec..a227d87aa1bb94 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -178,8 +178,10 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } if (build_block.rows() <= 1) { - LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id() - << ", task: " << state->task_id(); + LOG(WARNING) << fmt::format( + "Query:{}, hash join sink:{}, task:{}," + " has no data to revoke", + print_id(state->query_id()), _parent->node_id(), state->task_id()); if (spill_context) { spill_context->on_task_finished(); } @@ -270,9 +272,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( }); status = _finish_spilling(); VLOG_DEBUG << fmt::format( - "Query: {}, task {}, hash join sink {} _revoke_unpartitioned_block " + "Query:{}, hash join sink:{}, task:{}, _revoke_unpartitioned_block, " "set_ready_to_read", - print_id(state->query_id()), state->task_id(), _parent->node_id()); + print_id(state->query_id()), _parent->node_id(), state->task_id()); _dependency->set_ready_to_read(); } @@ -303,9 +305,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr& spill_context) { SCOPED_TIMER(_spill_total_timer); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " hash join sink " << _parent->node_id() << " revoke_memory" - << ", eos: " << _child_eos; + VLOG_DEBUG << fmt::format("Query:{}, hash join sink:{}, task:{}, revoke_memory, eos:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + _child_eos); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); if (!_shared_state->need_to_spill) { @@ -322,9 +324,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( auto spill_fin_cb = [this, state, query_id, spill_context]() { Status status; if (_child_eos) { - LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ", task " - << state->task_id() << " hash join sink " << _parent->node_id() - << " finish spilling, set_ready_to_read"; + LOG(INFO) << fmt::format( + "Query:{}, hash join sink:{}, task:{}, finish spilling, set_ready_to_read", + print_id(this->state()->query_id()), _parent->node_id(), state->task_id()); std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -565,10 +567,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B revocable_size = revocable_mem_size(state); query_mem_limit = state->get_query_ctx()->get_mem_limit(); LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, need spill: {}, query mem limit: {}, " - "revocable " - "memory: {}", - print_id(state->query_id()), state->task_id(), node_id(), need_to_spill, + "Query:{}, hash join sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, " + "revocable memory:{}", + print_id(state->query_id()), node_id(), state->task_id(), need_to_spill, PrettyPrinter::print_bytes(query_mem_limit), PrettyPrinter::print_bytes(revocable_size)); } @@ -590,9 +591,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "Query:{}, hash join sink:{}, task:{} eos, revoke_memory " "because revocable memory is high", - print_id(state->query_id()), state->task_id(), node_id()); + print_id(state->query_id()), node_id(), state->task_id()); return revoke_memory(state, nullptr); } @@ -601,10 +602,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill " - "memory " - "usage: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill " + "memory usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); } @@ -642,9 +642,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (eos) { if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "Query:{}, hash join sink:{}, task:{}, eos, revoke_memory " "because revocable memory is high", - print_id(state->query_id()), state->task_id(), node_id()); + print_id(state->query_id()), node_id(), state->task_id()); return revoke_memory(state, nullptr); } } @@ -653,9 +653,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state.update_memory_usage(); if (eos) { LOG(INFO) << fmt::format( - "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill memory " - "usage: {}", - print_id(state->query_id()), state->task_id(), node_id(), + "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill memory " + "usage:{}", + print_id(state->query_id()), node_id(), state->task_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); local_state._dependency->set_ready_to_read(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 03c4072f7de9c4..debe1d59710aa9 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -195,9 +195,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, profile()->add_info_string("Spilled", "true"); } - VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " - << Base::_parent->node_id() << " revoke_memory" - << ", eos: " << _eos; + VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke_memory, eos:{}", + print_id(state->query_id()), _parent->node_id(), state->task_id(), + _eos); auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, _spilling_stream, print_id(state->query_id()), "sort", _parent->node_id(), @@ -219,13 +219,14 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " sort node " - << _parent->node_id() << " revoke memory error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, sort sink:{}, task:{}, revoke memory error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); } else { - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " revoke memory finish"; + VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke memory finish", + print_id(query_id), _parent->node_id(), state->task_id()); } if (!status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 8a58d0b15040a2..43bb8a65b6e605 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -70,8 +70,8 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const } Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { auto& parent = Base::_parent->template cast(); - VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << _parent->node_id() - << " merge spill data"; + VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill data", + print_id(state->query_id()), _parent->node_id(), state->task_id()); _spill_dependency->Dependency::block(); auto query_id = state->query_id(); @@ -82,8 +82,9 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "Query " << print_id(query_id) << " sort node " - << _parent->node_id() << " merge spill data error: " << status; + LOG(WARNING) << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill data error:{}", + print_id(query_id), _parent->node_id(), state->task_id(), status); } _shared_state->close(); for (auto& stream : _current_merging_streams) { @@ -91,18 +92,20 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } _current_merging_streams.clear(); } else { - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " merge spill data finish"; + VLOG_DEBUG << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill data finish", + print_id(query_id), _parent->node_id(), state->task_id()); } }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; while (!state->is_cancelled()) { int max_stream_count = _calc_spill_blocks_to_merge(state); - VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() - << " merge spill streams, streams count: " - << _shared_state->sorted_streams.size() - << ", curren merge max stream count: " << max_stream_count; + VLOG_DEBUG << fmt::format( + "Query:{}, sort source:{}, task:{}, merge spill streams, streams count:{}, " + "curren merge max stream count:{}", + print_id(query_id), _parent->node_id(), state->task_id(), + _shared_state->sorted_streams.size(), max_stream_count); { SCOPED_TIMER(Base::_spill_recover_time); status = _create_intermediate_merger( diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d8858dd5ababc4..e0fb08c43baa6c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -381,10 +381,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } if (scan_task->cached_blocks.back().first->rows() > 0) { - auto block_avg_bytes = - (scan_task->cached_blocks.back().first->allocated_bytes() + - scan_task->cached_blocks.back().first->rows() - 1) / - scan_task->cached_blocks.back().first->rows() * ctx->batch_size(); + auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() + + scan_task->cached_blocks.back().first->rows() - 1) / + scan_task->cached_blocks.back().first->rows() * + ctx->batch_size(); scanner->update_block_avg_bytes(block_avg_bytes); } if (ctx->low_memory_mode()) { diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index 014b83be23d636..40323f824a815f 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -29,6 +29,7 @@ #include "runtime/exec_env.h" #include "util/slice.h" #include "vec/core/block.h" +#include "vec/spill/spill_stream_manager.h" namespace doris { #include "common/compile_check_begin.h" namespace io { @@ -52,11 +53,12 @@ Status SpillReader::open() { Slice result((char*)&block_count_, sizeof(size_t)); + size_t total_read_bytes = 0; // read block count size_t bytes_read = 0; RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count - COUNTER_UPDATE(_read_file_size, bytes_read); + total_read_bytes += bytes_read; if (_query_statistics) { _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); } @@ -66,7 +68,7 @@ Status SpillReader::open() { result.data = (char*)&max_sub_block_size_; RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count - COUNTER_UPDATE(_read_file_size, bytes_read); + total_read_bytes += bytes_read; if (_query_statistics) { _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); } @@ -87,7 +89,9 @@ Status SpillReader::open() { RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read)); DCHECK(bytes_read == block_count_ * sizeof(size_t)); - COUNTER_UPDATE(_read_file_size, bytes_read); + total_read_bytes += bytes_read; + COUNTER_UPDATE(_read_file_size, total_read_bytes); + ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes); if (_query_statistics) { _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); } @@ -134,6 +138,7 @@ Status SpillReader::read(Block* block, bool* eos) { if (bytes_read > 0) { COUNTER_UPDATE(_read_file_size, bytes_read); + ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read); if (_query_statistics) { _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); } diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 07a947b5ef3530..833c5471fca5c0 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -43,6 +43,9 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" +SpillStreamManager::~SpillStreamManager() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); +} SpillStreamManager::SpillStreamManager( std::unordered_map>&& spill_store_map) @@ -84,9 +87,27 @@ Status SpillStreamManager::init() { "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); }, &_spill_gc_thread)); LOG(INFO) << "spill gc thread started"; + + _init_metrics(); + return Status::OK(); } +void SpillStreamManager::_init_metrics() { + _entity = DorisMetrics::instance()->metric_registry()->register_entity("spill", + {{"name", "spill"}}); + + _spill_write_bytes_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_write_bytes"); + _spill_write_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( + _spill_write_bytes_metric.get())); + + _spill_read_bytes_metric = std::make_unique( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_read_bytes"); + _spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( + _spill_read_bytes_metric.get())); +} + // clean up stale spilled files void SpillStreamManager::_spill_gc_thread_callback() { while (!_stop_background_threads_latch.wait_for( diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 7bcfe9500979b9..53ae89e9111d0a 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -30,6 +30,14 @@ namespace doris { #include "common/compile_check_begin.h" class RuntimeProfile; +template +class AtomicCounter; +using IntAtomicCounter = AtomicCounter; +template +class AtomicGauge; +using UIntGauge = AtomicGauge; +class MetricEntity; +struct MetricPrototype; namespace vectorized { @@ -106,6 +114,7 @@ class SpillDataDir { }; class SpillStreamManager { public: + ~SpillStreamManager(); SpillStreamManager(std::unordered_map>&& spill_store_map); @@ -133,7 +142,12 @@ class SpillStreamManager { ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); } + void update_spill_write_bytes(int64_t bytes) { _spill_write_bytes_counter->increment(bytes); } + + void update_spill_read_bytes(int64_t bytes) { _spill_read_bytes_counter->increment(bytes); } + private: + void _init_metrics(); Status _init_spill_store_map(); void _spill_gc_thread_callback(); std::vector _get_stores_for_spill(TStorageMedium::type storage_medium); @@ -145,6 +159,14 @@ class SpillStreamManager { scoped_refptr _spill_gc_thread; std::atomic_uint64_t id_ = 0; + + std::shared_ptr _entity {nullptr}; + + std::unique_ptr _spill_write_bytes_metric {nullptr}; + std::unique_ptr _spill_read_bytes_metric {nullptr}; + + IntAtomicCounter* _spill_write_bytes_counter {nullptr}; + IntAtomicCounter* _spill_read_bytes_counter {nullptr}; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 5cff9042103ad6..3a576004091f83 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -59,6 +59,7 @@ Status SpillWriter::close() { COUNTER_UPDATE(_write_file_current_size, meta_.size()); } data_dir_->update_spill_data_usage(meta_.size()); + ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size()); RETURN_IF_ERROR(file_writer_->close()); @@ -143,6 +144,7 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { Defer defer {[&]() { if (status.ok()) { data_dir_->update_spill_data_usage(buff_size); + ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size); written_bytes += buff_size; max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size);