From 5f71bd1f9f3e0319d6f667872d3f6aec9f6ea420 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 25 Apr 2024 19:39:29 +0800 Subject: [PATCH] 1 --- be/src/cloud/cloud_delta_writer.cpp | 2 +- be/src/common/config.h | 2 -- be/src/common/daemon.cpp | 7 +++---- be/src/common/daemon.h | 2 +- be/src/olap/calc_delete_bitmap_executor.cpp | 2 +- be/src/olap/memtable.cpp | 2 +- be/src/olap/memtable_memory_limiter.cpp | 19 +++++++++++-------- be/src/olap/memtable_writer.cpp | 2 +- be/src/runtime/load_stream_writer.cpp | 2 +- be/src/runtime/thread_context.h | 5 ++++- be/src/vec/exec/scan/scanner_context.h | 2 +- 11 files changed, 25 insertions(+), 22 deletions(-) diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index 23e00dfb7469e5..4cdb8dd6cec365 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -29,7 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques RuntimeProfile* profile, const UniqueId& load_id) : BaseDeltaWriter(req, profile, load_id), _engine(engine) { _rowset_builder = std::make_unique(engine, req, profile); - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } CloudDeltaWriter::~CloudDeltaWriter() = default; diff --git a/be/src/common/config.h b/be/src/common/config.h index 4139d76b6bcb7a..589c5d34a75d1c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -531,8 +531,6 @@ DECLARE_mInt64(load_error_log_limit_bytes); // be brpc interface is classified into two categories: light and heavy // each category has diffrent thread number // threads to handle heavy api interface, such as transmit_data/transmit_block etc -// Default, if less than or equal 32 core, the following are 128, 128, 10240, 10240 in turn. -// if greater than 32 core, the following are core num * 4, core num * 4, core num * 320, core num * 320 in turn DECLARE_Int32(brpc_heavy_work_pool_threads); // threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start DECLARE_Int32(brpc_light_work_pool_threads); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3a217413db8feb..80efc9aae9e829 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -286,7 +286,7 @@ void Daemon::memory_gc_thread() { } } -void Daemon::memtable_memory_limiter_tracker_refresh_thread() { +void Daemon::memtable_memory_refresh_thread() { // Refresh the memory statistics of the load channel tracker more frequently, // which helps to accurately control the memory of LoadChannelMgr. while (!_stop_background_threads_latch.wait_for( @@ -404,9 +404,8 @@ void Daemon::start() { &_threads.emplace_back()); CHECK(st.ok()) << st; st = Thread::create( - "Daemon", "memtable_memory_limiter_tracker_refresh_thread", - [this]() { this->memtable_memory_limiter_tracker_refresh_thread(); }, - &_threads.emplace_back()); + "Daemon", "memtable_memory_refresh_thread", + [this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; if (config::enable_metric_calculator) { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 28f630678969d6..0c282a8516a346 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -40,7 +40,7 @@ class Daemon { void tcmalloc_gc_thread(); void memory_maintenance_thread(); void memory_gc_thread(); - void memtable_memory_limiter_tracker_refresh_thread(); + void memtable_memory_refresh_thread(); void calculate_metrics_thread(); void je_purge_dirty_pages_thread() const; void report_runtime_query_statistics_thread(); diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp index ccb3b90e5e5c09..3983dc0a98642a 100644 --- a/be/src/olap/calc_delete_bitmap_executor.cpp +++ b/be/src/olap/calc_delete_bitmap_executor.cpp @@ -38,7 +38,7 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_ { std::shared_lock rlock(_lock); RETURN_IF_ERROR(_status); - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } return _thread_token->submit_func([=, this]() { diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 87ae20237e536f..102cf1ee0d9846 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -65,7 +65,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, _total_size_of_aggregate_states(0), _mem_usage(0) { g_memtable_cnt << 1; - _query_thread_context.init(); + _query_thread_context.init_unlocked(); _arena = std::make_unique(); _vec_row_comparator = std::make_shared(_tablet_schema); // TODO: Support ZOrderComparator in the future diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index f0a46aaa4522cf..7c7c87dce8c944 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -222,14 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _last_limit = limit; _log_timer.reset(); - LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str() - << " (without allocator cache: " - << PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache()) - << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", memtable writers num: " << _writers.size() - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + // if not exist load task, this log should not be printed. + if (_mem_usage != 0) { + LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str() + << " (without allocator cache: " + << PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache()) + << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + } } void MemTableMemoryLimiter::_refresh_mem_tracker() { diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index f37a88e726d9e4..073c2c4dbace1d 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -70,7 +70,7 @@ Status MemTableWriter::init(std::shared_ptr rowset_writer, _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; _partial_update_info = partial_update_info; - _query_thread_context.init(); + _query_thread_context.init_unlocked(); _reset_mem_table(); diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 8f918d6df554c9..535fbf772c9527 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -75,7 +75,7 @@ LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profil // TODO(plat1ko): CloudStorageEngine _rowset_builder = std::make_unique( ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile); - _query_thread_context.init(); // from load stream + _query_thread_context.init_unlocked(); // from load stream } LoadStreamWriter::~LoadStreamWriter() = default; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index dd278c0073e287..431d7ffd194fb0 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -284,6 +284,7 @@ static ThreadContext* thread_context() { __builtin_unreachable(); } +// belong to one query object member, not be shared by multiple queries. class QueryThreadContext { public: QueryThreadContext() = default; @@ -291,7 +292,9 @@ class QueryThreadContext { const std::shared_ptr& mem_tracker) : query_id(query_id), query_mem_tracker(mem_tracker) {} - void init() { + // Not thread safe, generally be called in class constructor, shared_ptr use_count may be + // wrong when called by multiple threads, cause crash after object be destroyed prematurely. + void init_unlocked() { #ifndef BE_TEST ORPHAN_TRACKER_CHECK(); query_id = doris::thread_context()->task_id(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index d1d9aad0114bd1..5ac25819067087 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -57,7 +57,7 @@ class SimplifiedScanScheduler; class ScanTask { public: ScanTask(std::weak_ptr delegate_scanner) : scanner(delegate_scanner) { - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } ~ScanTask() {