Skip to content

Commit

Permalink
Add spill metrics and improve spill log printing (#46029)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg authored Dec 27, 2024
1 parent d5b75da commit d46495e
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 124 deletions.
3 changes: 2 additions & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context,
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and retry.
LOG_EVERY_T(INFO, 60) << fmt::format(
"Failed to reserve memory {} for flush memtable, retry after 100ms", size);
"Failed to reserve memory {} for flush memtable, retry after 100ms",
PrettyPrinter::print_bytes(size));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
st = Status::OK();
Expand Down
25 changes: 13 additions & 12 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
revocable_size = revocable_mem_size(state);
query_mem_limit = state->get_query_ctx()->get_mem_limit();
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} eos, need spill: {}, query mem limit: {}, "
"revocable memory: {}",
print_id(state->query_id()), state->task_id(), node_id(),
"Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, "
"revocable memory:{}",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));

Expand Down Expand Up @@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need spill: {}, revocable "
"memory: {}",
print_id(state->query_id()), state->task_id(), _parent->node_id(), _eos,
"Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need spill:{}, revocable "
"memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos,
_shared_state->is_spilled,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
if (!_shared_state->is_spilled) {
Expand Down Expand Up @@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << "Query " << print_id(query_id) << " agg node "
<< Base::_parent->node_id()
<< " revoke_memory error: " << status;
LOG(WARNING) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory error:{}",
print_id(query_id), Base::_parent->node_id(), state->task_id(),
status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} revoke_memory finish, eos: {}, "
"revocable memory: {}",
print_id(state->query_id()), state->task_id(), _parent->node_id(),
"Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, "
"revocable memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
_eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}
Expand Down
32 changes: 18 additions & 14 deletions be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << "Query " << print_id(query_id) << " agg node "
<< _parent->node_id() << " recover agg data error: " << status;
LOG(WARNING) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover agg data error:{}",
print_id(query_id), _parent->node_id(), state->task_id(), status);
}
_shared_state->close();
}
Expand Down Expand Up @@ -305,15 +306,16 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
}
}

VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id()
<< ", task id: " << state->task_id() << " recover partitioned finished, "
<< _shared_state->spill_partitions.size() << " partitions left, "
<< accumulated_blocks_size
<< " bytes read, spill dep: " << (void*)(_spill_dependency.get());
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover partitioned finished, partitions "
"left:{}, bytes read:{}, spill dep:{}",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size(), accumulated_blocks_size,
(void*)(_spill_dependency.get()));
return status;
};

auto exception_catch_func = [spill_func, query_id]() {
auto exception_catch_func = [this, state, spill_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
Expand All @@ -323,8 +325,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});

auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();
LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id)
<< " recover exception : " << status.to_string();
LOG_IF(INFO, !status.ok()) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover exception:{}", print_id(query_id),
_parent->node_id(), state->task_id(), status.to_string());
return status;
};

Expand All @@ -334,10 +337,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();

VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id()
<< ", task id: " << state->task_id() << " begin to recover, "
<< _shared_state->spill_partitions.size()
<< " partitions left, _spill_dependency: " << (void*)(_spill_dependency.get());
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, begin to recover, partitions left:{}, "
"_spill_dependency:{}",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size(), (void*)(_spill_dependency.get()));
return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
Expand Down
98 changes: 50 additions & 48 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}

COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
VLOG_DEBUG << "Query: " << print_id(query_id)
<< " hash probe revoke done, node: " << p.node_id()
<< ", task: " << state->task_id();

VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" spill_probe_blocks done",
print_id(query_id), p.node_id(), state->task_id());
return Status::OK();
};

Expand Down Expand Up @@ -275,9 +277,10 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state,
uint32_t partition_index,
bool& has_data) {
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< " recover_build_blocks_from_disk";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recover_build_blocks_from_disk",
print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index);
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
Expand All @@ -291,9 +294,10 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
SCOPED_TIMER(_recovery_build_timer);

bool eos = false;
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< ", recoverying build data";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recoverying build data",
print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index);
Status status;
while (!eos) {
vectorized::Block block;
Expand All @@ -315,7 +319,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}

if (UNLIKELY(state->is_cancelled())) {
LOG(INFO) << "recovery build block when canceled.";
LOG(INFO) << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery build data canceled",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
partition_index);
break;
}

Expand All @@ -338,9 +346,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
_shared_state->spilled_streams[partition_index].reset();
VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", node: " << _parent->node_id() << ", task id: " << state->task_id()
<< ", partition: " << partition_index;
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery build data eos",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
partition_index);
}
return status;
};
Expand All @@ -365,16 +375,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_spill_dependency->block();
{
auto* pipeline_task = state->get_task();
if (pipeline_task) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << p.node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< ", dependency: " << _dependency
<< ", task debug_string: " << pipeline_task->debug_string();
}
}

DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
{
Expand All @@ -386,9 +386,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(),
exception_catch_func);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< " recover_build_blocks_from_disk submit func";
return spill_io_pool->submit(std::move(spill_runnable));
}

Expand Down Expand Up @@ -429,7 +426,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim

auto query_id = state->query_id();

auto read_func = [this, query_id, &spilled_stream, &blocks] {
auto read_func = [this, query_id, partition_index, &spilled_stream, &blocks] {
SCOPED_TIMER(_recovery_probe_timer);

vectorized::Block block;
Expand Down Expand Up @@ -457,8 +454,10 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
}
}
if (eos) {
VLOG_DEBUG << "Query: " << print_id(query_id)
<< ", recovery probe data done: " << spilled_stream->get_spill_dir();
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery probe data done",
print_id(query_id), _parent->node_id(), _state->task_id(), partition_index);
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
Expand Down Expand Up @@ -675,13 +674,13 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(

RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(),
&block, true));
VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", internal build operator finished, node id: " << node_id()
<< ", task id: " << state->task_id()
<< ", partition: " << local_state._partition_cursor << "rows: " << block.rows()
<< ", usage: "
<< _inner_sink_operator->get_memory_usage(
local_state._shared_state->inner_runtime_state.get());
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" internal build operator finished, partition:{}, rows:{}, memory usage:{}",
print_id(state->query_id()), node_id(), state->task_id(), local_state._partition_cursor,
block.rows(),
_inner_sink_operator->get_memory_usage(
local_state._shared_state->inner_runtime_state.get()));

COUNTER_SET(local_state._hash_table_memory_usage,
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
Expand Down Expand Up @@ -734,9 +733,10 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (!has_data) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true));
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id()
<< ", task: " << state->task_id() << "partition: " << partition_index
<< " has no data to recovery";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, has no data to recovery",
print_id(state->query_id()), node_id(), state->task_id(), partition_index);
break;
} else {
return Status::OK();
Expand All @@ -755,9 +755,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,

*eos = false;
if (in_mem_eos) {
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id()
<< ", task: " << state->task_id()
<< ", partition: " << local_state._partition_cursor;
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, probe done",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._partition_cursor);
local_state._partition_cursor++;
if (local_state._partition_cursor == _partition_count) {
*eos = true;
Expand Down Expand Up @@ -848,8 +850,8 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta

Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id()
<< ", task: " << state->task_id();
VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, revoke_memory",
print_id(state->query_id()), node_id(), state->task_id());

RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
return Status::OK();
Expand Down Expand Up @@ -894,10 +896,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
#ifndef NDEBUG
Defer eos_check_defer([&] {
if (*eos) {
LOG(INFO) << "Query: " << print_id(state->query_id())
<< ", hash probe node: " << node_id() << ", task: " << state->task_id()
<< ", eos with child eos: " << local_state._child_eos
<< ", need spill: " << need_to_spill;
LOG(INFO) << fmt::format(
"Query:{}, hash join probe:{}, task:{}, child eos:{}, need spill:{}",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._child_eos, need_to_spill);
}
});
#endif
Expand Down
Loading

0 comments on commit d46495e

Please sign in to comment.