Skip to content

Commit

Permalink
Merge branch 'master' into 20240726_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 30, 2024
2 parents 2e5ac71 + 2046577 commit 66b115c
Show file tree
Hide file tree
Showing 56 changed files with 2,003 additions and 143 deletions.
4 changes: 3 additions & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ class BeExecVersionManager {
* b. clear old version of version 3->4
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 5;
constexpr inline int BeExecVersionManager::max_be_exec_version = 6;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);

Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ void Daemon::memory_maintenance_thread() {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif
MemInfo::refresh_memory_bvar();

// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/reset_rpc_channel_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
namespace doris {
ResetRPCChannelAction::ResetRPCChannelAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type) {}
: HttpHandlerWithAuth(exec_env, hier, type), _exec_env(exec_env) {}
void ResetRPCChannelAction::handle(HttpRequest* req) {
std::string endpoints = req->param("endpoints");
if (iequal(endpoints, "all")) {
Expand Down
18 changes: 15 additions & 3 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num");
bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written");
bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
bvar::Adder<uint64_t> inflight_hdfs_file_writer("inflight_hdfs_file_writer");
bvar::Adder<uint64_t> hdfs_file_writer_async_close_queuing("hdfs_file_writer_async_close_queuing");
bvar::Adder<uint64_t> hdfs_file_writer_async_close_processing(
"hdfs_file_writer_async_close_processing");

static constexpr size_t MB = 1024 * 1024;
#ifndef USE_LIBHDFS3
Expand Down Expand Up @@ -122,7 +125,11 @@ class HdfsWriteMemUsageRecorder {
}

private:
size_t max_jvm_heap_size() const { return JniUtil::get_max_jni_heap_memory_size(); }
// clang-format off
size_t max_jvm_heap_size() const {
return JniUtil::get_max_jni_heap_memory_size();
}
// clang-format on
[[maybe_unused]] std::size_t cur_memory_comsuption {0};
std::mutex cur_memory_latch;
std::condition_variable cv;
Expand Down Expand Up @@ -230,8 +237,13 @@ Status HdfsFileWriter::close(bool non_block) {
_state = State::ASYNC_CLOSING;
_async_close_pack = std::make_unique<AsyncCloseStatusPack>();
_async_close_pack->future = _async_close_pack->promise.get_future();
return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
[&]() { _async_close_pack->promise.set_value(_close_impl()); });
hdfs_file_writer_async_close_queuing << 1;
return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
hdfs_file_writer_async_close_queuing << -1;
hdfs_file_writer_async_close_processing << 1;
_async_close_pack->promise.set_value(_close_impl());
hdfs_file_writer_async_close_processing << -1;
});
}
_st = _close_impl();
_state = State::CLOSED;
Expand Down
20 changes: 14 additions & 6 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@

namespace doris::io {

bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer", "total_num");
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer", "bytes_written");
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer", "file_created");
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer", "file_being_written");
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
"s3_file_writer_async_close_processing");

S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
std::string key, const FileWriterOptions* opts)
Expand Down Expand Up @@ -141,8 +144,13 @@ Status S3FileWriter::close(bool non_block) {
_state = State::ASYNC_CLOSING;
_async_close_pack = std::make_unique<AsyncCloseStatusPack>();
_async_close_pack->future = _async_close_pack->promise.get_future();
return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
[&]() { _async_close_pack->promise.set_value(_close_impl()); });
s3_file_writer_async_close_queuing << 1;
return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
s3_file_writer_async_close_queuing << -1;
s3_file_writer_async_close_processing << 1;
_async_close_pack->promise.set_value(_close_impl());
s3_file_writer_async_close_processing << -1;
});
}
_st = _close_impl();
_state = State::CLOSED;
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,11 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
for (auto cid : cids) {
if (!_segment->can_apply_predicate_safely(cid,
_opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
}
RowRanges tmp_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
Expand Down
15 changes: 11 additions & 4 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <utility>

#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
Expand Down Expand Up @@ -48,9 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
}
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
return Status::OK();
Expand Down Expand Up @@ -107,6 +109,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r
} else {
_sink_type = sink.type;
}
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
_result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
} else {
_result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
}
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}
Expand All @@ -126,8 +133,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
state->query_id(), _result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
}
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>

Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block);
TResultSinkType::type _sink_type;
int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts = nullptr;

Expand Down
80 changes: 61 additions & 19 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,17 @@

namespace doris {

bvar::Adder<int64_t> g_memtrackerlimiter_cnt("memtrackerlimiter_cnt");
static bvar::Adder<int64_t> memory_memtrackerlimiter_cnt("memory_memtrackerlimiter_cnt");
static bvar::Adder<int64_t> memory_all_trackers_sum_bytes("memory_all_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_global_trackers_sum_bytes("memory_global_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_query_trackers_sum_bytes("memory_query_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_load_trackers_sum_bytes("memory_load_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_compaction_trackers_sum_bytes(
"memory_compaction_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_schema_change_trackers_sum_bytes(
"memory_schema_change_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes");

constexpr auto GC_MAX_SEEK_TRACKER = 1000;

std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
Expand Down Expand Up @@ -80,7 +90,7 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_
if (_type == Type::LOAD || _type == Type::QUERY) {
_query_statistics = std::make_shared<QueryStatistics>();
}
g_memtrackerlimiter_cnt << 1;
memory_memtrackerlimiter_cnt << 1;
}

std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLimiter::Type type,
Expand Down Expand Up @@ -137,7 +147,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
<< print_address_sanitizers();
#endif
}
g_memtrackerlimiter_cnt << -1;
memory_memtrackerlimiter_cnt << -1;
}

#ifndef NDEBUG
Expand Down Expand Up @@ -223,9 +233,40 @@ void MemTrackerLimiter::refresh_global_counter() {
}
}
}
int64_t all_trackers_mem_sum = 0;
for (auto it : type_mem_sum) {
MemTrackerLimiter::TypeMemSum[it.first]->set(it.second);
all_trackers_mem_sum += it.second;
switch (it.first) {
case Type::GLOBAL:
memory_global_trackers_sum_bytes
<< it.second - memory_global_trackers_sum_bytes.get_value();
break;
case Type::QUERY:
memory_query_trackers_sum_bytes
<< it.second - memory_query_trackers_sum_bytes.get_value();
break;
case Type::LOAD:
memory_load_trackers_sum_bytes
<< it.second - memory_load_trackers_sum_bytes.get_value();
break;
case Type::COMPACTION:
memory_compaction_trackers_sum_bytes
<< it.second - memory_compaction_trackers_sum_bytes.get_value();
break;
case Type::SCHEMA_CHANGE:
memory_schema_change_trackers_sum_bytes
<< it.second - memory_schema_change_trackers_sum_bytes.get_value();
break;
case Type::OTHER:
memory_other_trackers_sum_bytes
<< it.second - memory_other_trackers_sum_bytes.get_value();
}
}
all_trackers_mem_sum += MemInfo::allocator_cache_mem();
all_trackers_mem_sum += MemInfo::allocator_metadata_mem();
memory_all_trackers_sum_bytes << all_trackers_mem_sum -
memory_all_trackers_sum_bytes.get_value();
}

void MemTrackerLimiter::clean_tracker_limiter_group() {
Expand All @@ -248,7 +289,7 @@ void MemTrackerLimiter::clean_tracker_limiter_group() {

void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots) {
MemTrackerLimiter::refresh_global_counter();
int64_t all_tracker_mem_sum = 0;
int64_t all_trackers_mem_sum = 0;
Snapshot snapshot;
for (auto it : MemTrackerLimiter::TypeMemSum) {
snapshot.type = "overview";
Expand All @@ -257,7 +298,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.cur_consumption = it.second->current_value();
snapshot.peak_consumption = it.second->peak_value();
(*snapshots).emplace_back(snapshot);
all_tracker_mem_sum += it.second->current_value();
all_trackers_mem_sum += it.second->current_value();
}

snapshot.type = "overview";
Expand All @@ -266,43 +307,44 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.cur_consumption = MemInfo::allocator_cache_mem();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
all_tracker_mem_sum += MemInfo::allocator_cache_mem();
all_trackers_mem_sum += MemInfo::allocator_cache_mem();

snapshot.type = "overview";
snapshot.label = "tc/jemalloc_metadata";
snapshot.limit = -1;
snapshot.cur_consumption = MemInfo::allocator_metadata_mem();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
all_tracker_mem_sum += MemInfo::allocator_metadata_mem();
all_trackers_mem_sum += MemInfo::allocator_metadata_mem();

snapshot.type = "overview";
snapshot.label = "reserved_memory";
snapshot.limit = -1;
snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
all_trackers_mem_sum += GlobalMemoryArbitrator::process_reserved_memory();

snapshot.type = "overview";
snapshot.label = "sum of all trackers"; // is virtual memory
snapshot.label = "sum_of_all_trackers"; // is virtual memory
snapshot.limit = -1;
snapshot.cur_consumption = all_tracker_mem_sum;
snapshot.cur_consumption = all_trackers_mem_sum;
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);

snapshot.type = "overview";
#ifdef ADDRESS_SANITIZER
snapshot.label = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM
snapshot.label = "[ASAN]VmRSS(process resident memory)"; // from /proc VmRSS VmHWM
#else
snapshot.label = "process resident memory"; // from /proc VmRSS VmHWM
snapshot.label = "VmRSS(process resident memory)"; // from /proc VmRSS VmHWM
#endif
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_rss();
snapshot.peak_consumption = PerfCounters::get_vm_hwm();
(*snapshots).emplace_back(snapshot);

snapshot.type = "overview";
snapshot.label = "reserve_memory";
snapshot.limit = -1;
snapshot.cur_consumption = GlobalMemoryArbitrator::process_reserved_memory();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);

snapshot.type = "overview";
snapshot.label = "process virtual memory"; // from /proc VmSize VmPeak
snapshot.label = "VmSize(process virtual memory)"; // from /proc VmSize VmPeak
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_size();
snapshot.peak_consumption = PerfCounters::get_vm_peak();
Expand Down
Loading

0 comments on commit 66b115c

Please sign in to comment.