Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spill metrics and improve spill log printing #46029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context,
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and retry.
LOG_EVERY_T(INFO, 60) << fmt::format(
"Failed to reserve memory {} for flush memtable, retry after 100ms", size);
"Failed to reserve memory {} for flush memtable, retry after 100ms",
PrettyPrinter::print_bytes(size));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
st = Status::OK();
Expand Down
25 changes: 13 additions & 12 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
revocable_size = revocable_mem_size(state);
query_mem_limit = state->get_query_ctx()->get_mem_limit();
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} eos, need spill: {}, query mem limit: {}, "
"revocable memory: {}",
print_id(state->query_id()), state->task_id(), node_id(),
"Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, "
"revocable memory:{}",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));

Expand Down Expand Up @@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need spill: {}, revocable "
"memory: {}",
print_id(state->query_id()), state->task_id(), _parent->node_id(), _eos,
"Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need spill:{}, revocable "
"memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos,
_shared_state->is_spilled,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
if (!_shared_state->is_spilled) {
Expand Down Expand Up @@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << "Query " << print_id(query_id) << " agg node "
<< Base::_parent->node_id()
<< " revoke_memory error: " << status;
LOG(WARNING) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory error:{}",
print_id(query_id), Base::_parent->node_id(), state->task_id(),
status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
"Query: {}, task {}, agg sink {} revoke_memory finish, eos: {}, "
"revocable memory: {}",
print_id(state->query_id()), state->task_id(), _parent->node_id(),
"Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, "
"revocable memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
_eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}
Expand Down
32 changes: 18 additions & 14 deletions be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << "Query " << print_id(query_id) << " agg node "
<< _parent->node_id() << " recover agg data error: " << status;
LOG(WARNING) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover agg data error:{}",
print_id(query_id), _parent->node_id(), state->task_id(), status);
}
_shared_state->close();
}
Expand Down Expand Up @@ -305,15 +306,16 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
}
}

VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id()
<< ", task id: " << state->task_id() << " recover partitioned finished, "
<< _shared_state->spill_partitions.size() << " partitions left, "
<< accumulated_blocks_size
<< " bytes read, spill dep: " << (void*)(_spill_dependency.get());
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover partitioned finished, partitions "
"left:{}, bytes read:{}, spill dep:{}",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size(), accumulated_blocks_size,
(void*)(_spill_dependency.get()));
return status;
};

auto exception_catch_func = [spill_func, query_id]() {
auto exception_catch_func = [this, state, spill_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
Expand All @@ -323,8 +325,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});

auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();
LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id)
<< " recover exception : " << status.to_string();
LOG_IF(INFO, !status.ok()) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover exception:{}", print_id(query_id),
_parent->node_id(), state->task_id(), status.to_string());
return status;
};

Expand All @@ -334,10 +337,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();

VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id()
<< ", task id: " << state->task_id() << " begin to recover, "
<< _shared_state->spill_partitions.size()
<< " partitions left, _spill_dependency: " << (void*)(_spill_dependency.get());
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, begin to recover, partitions left:{}, "
"_spill_dependency:{}",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size(), (void*)(_spill_dependency.get()));
return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
Expand Down
98 changes: 50 additions & 48 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}

COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
VLOG_DEBUG << "Query: " << print_id(query_id)
<< " hash probe revoke done, node: " << p.node_id()
<< ", task: " << state->task_id();

VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" spill_probe_blocks done",
print_id(query_id), p.node_id(), state->task_id());
return Status::OK();
};

Expand Down Expand Up @@ -275,9 +277,10 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state,
uint32_t partition_index,
bool& has_data) {
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< " recover_build_blocks_from_disk";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recover_build_blocks_from_disk",
print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index);
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
Expand All @@ -291,9 +294,10 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
SCOPED_TIMER(_recovery_build_timer);

bool eos = false;
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< ", recoverying build data";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recoverying build data",
print_id(state->query_id()), _parent->node_id(), state->task_id(), partition_index);
Status status;
while (!eos) {
vectorized::Block block;
Expand All @@ -315,7 +319,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}

if (UNLIKELY(state->is_cancelled())) {
LOG(INFO) << "recovery build block when canceled.";
LOG(INFO) << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery build data canceled",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
partition_index);
break;
}

Expand All @@ -338,9 +346,11 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
_shared_state->spilled_streams[partition_index].reset();
VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", node: " << _parent->node_id() << ", task id: " << state->task_id()
<< ", partition: " << partition_index;
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery build data eos",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
partition_index);
}
return status;
};
Expand All @@ -365,16 +375,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_spill_dependency->block();
{
auto* pipeline_task = state->get_task();
if (pipeline_task) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << p.node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< ", dependency: " << _dependency
<< ", task debug_string: " << pipeline_task->debug_string();
}
}

DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
{
Expand All @@ -386,9 +386,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(),
exception_catch_func);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< " recover_build_blocks_from_disk submit func";
return spill_io_pool->submit(std::move(spill_runnable));
}

Expand Down Expand Up @@ -429,7 +426,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim

auto query_id = state->query_id();

auto read_func = [this, query_id, &spilled_stream, &blocks] {
auto read_func = [this, query_id, partition_index, &spilled_stream, &blocks] {
SCOPED_TIMER(_recovery_probe_timer);

vectorized::Block block;
Expand Down Expand Up @@ -457,8 +454,10 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
}
}
if (eos) {
VLOG_DEBUG << "Query: " << print_id(query_id)
<< ", recovery probe data done: " << spilled_stream->get_spill_dir();
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, recovery probe data done",
print_id(query_id), _parent->node_id(), _state->task_id(), partition_index);
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
Expand Down Expand Up @@ -675,13 +674,13 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(

RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(),
&block, true));
VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", internal build operator finished, node id: " << node_id()
<< ", task id: " << state->task_id()
<< ", partition: " << local_state._partition_cursor << "rows: " << block.rows()
<< ", usage: "
<< _inner_sink_operator->get_memory_usage(
local_state._shared_state->inner_runtime_state.get());
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" internal build operator finished, partition:{}, rows:{}, memory usage:{}",
print_id(state->query_id()), node_id(), state->task_id(), local_state._partition_cursor,
block.rows(),
_inner_sink_operator->get_memory_usage(
local_state._shared_state->inner_runtime_state.get()));

COUNTER_SET(local_state._hash_table_memory_usage,
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
Expand Down Expand Up @@ -734,9 +733,10 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (!has_data) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true));
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id()
<< ", task: " << state->task_id() << "partition: " << partition_index
<< " has no data to recovery";
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, has no data to recovery",
print_id(state->query_id()), node_id(), state->task_id(), partition_index);
break;
} else {
return Status::OK();
Expand All @@ -755,9 +755,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,

*eos = false;
if (in_mem_eos) {
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id()
<< ", task: " << state->task_id()
<< ", partition: " << local_state._partition_cursor;
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" partition:{}, probe done",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._partition_cursor);
local_state._partition_cursor++;
if (local_state._partition_cursor == _partition_count) {
*eos = true;
Expand Down Expand Up @@ -848,8 +850,8 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta

Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id()
<< ", task: " << state->task_id();
VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, revoke_memory",
print_id(state->query_id()), node_id(), state->task_id());

RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
return Status::OK();
Expand Down Expand Up @@ -894,10 +896,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
#ifndef NDEBUG
Defer eos_check_defer([&] {
if (*eos) {
LOG(INFO) << "Query: " << print_id(state->query_id())
<< ", hash probe node: " << node_id() << ", task: " << state->task_id()
<< ", eos with child eos: " << local_state._child_eos
<< ", need spill: " << need_to_spill;
LOG(INFO) << fmt::format(
"Query:{}, hash join probe:{}, task:{}, child eos:{}, need spill:{}",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._child_eos, need_to_spill);
}
});
#endif
Expand Down
Loading
Loading