diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index 23e00dfb7469e51..4cdb8dd6cec3656 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -29,7 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques RuntimeProfile* profile, const UniqueId& load_id) : BaseDeltaWriter(req, profile, load_id), _engine(engine) { _rowset_builder = std::make_unique(engine, req, profile); - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } CloudDeltaWriter::~CloudDeltaWriter() = default; diff --git a/be/src/common/config.h b/be/src/common/config.h index 4139d76b6bcb7a2..589c5d34a75d1ca 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -531,8 +531,6 @@ DECLARE_mInt64(load_error_log_limit_bytes); // be brpc interface is classified into two categories: light and heavy // each category has diffrent thread number // threads to handle heavy api interface, such as transmit_data/transmit_block etc -// Default, if less than or equal 32 core, the following are 128, 128, 10240, 10240 in turn. -// if greater than 32 core, the following are core num * 4, core num * 4, core num * 320, core num * 320 in turn DECLARE_Int32(brpc_heavy_work_pool_threads); // threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start DECLARE_Int32(brpc_light_work_pool_threads); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3a217413db8febf..80efc9aae9e829b 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -286,7 +286,7 @@ void Daemon::memory_gc_thread() { } } -void Daemon::memtable_memory_limiter_tracker_refresh_thread() { +void Daemon::memtable_memory_refresh_thread() { // Refresh the memory statistics of the load channel tracker more frequently, // which helps to accurately control the memory of LoadChannelMgr. while (!_stop_background_threads_latch.wait_for( @@ -404,9 +404,8 @@ void Daemon::start() { &_threads.emplace_back()); CHECK(st.ok()) << st; st = Thread::create( - "Daemon", "memtable_memory_limiter_tracker_refresh_thread", - [this]() { this->memtable_memory_limiter_tracker_refresh_thread(); }, - &_threads.emplace_back()); + "Daemon", "memtable_memory_refresh_thread", + [this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; if (config::enable_metric_calculator) { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 28f630678969d60..0c282a8516a3465 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -40,7 +40,7 @@ class Daemon { void tcmalloc_gc_thread(); void memory_maintenance_thread(); void memory_gc_thread(); - void memtable_memory_limiter_tracker_refresh_thread(); + void memtable_memory_refresh_thread(); void calculate_metrics_thread(); void je_purge_dirty_pages_thread() const; void report_runtime_query_statistics_thread(); diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp index ccb3b90e5e5c098..3983dc0a98642a3 100644 --- a/be/src/olap/calc_delete_bitmap_executor.cpp +++ b/be/src/olap/calc_delete_bitmap_executor.cpp @@ -38,7 +38,7 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_ { std::shared_lock rlock(_lock); RETURN_IF_ERROR(_status); - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } return _thread_token->submit_func([=, this]() { diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 87ae20237e536f3..102cf1ee0d98465 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -65,7 +65,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, _total_size_of_aggregate_states(0), _mem_usage(0) { g_memtable_cnt << 1; - _query_thread_context.init(); + _query_thread_context.init_unlocked(); _arena = std::make_unique(); _vec_row_comparator = std::make_shared(_tablet_schema); // TODO: Support ZOrderComparator in the future diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index f0a46aaa4522cf9..7c7c87dce8c9440 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -222,14 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _last_limit = limit; _log_timer.reset(); - LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str() - << " (without allocator cache: " - << PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache()) - << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", memtable writers num: " << _writers.size() - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + // if not exist load task, this log should not be printed. + if (_mem_usage != 0) { + LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str() + << " (without allocator cache: " + << PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache()) + << "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + } } void MemTableMemoryLimiter::_refresh_mem_tracker() { diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index f37a88e726d9e4c..073c2c4dbace1dd 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -70,7 +70,7 @@ Status MemTableWriter::init(std::shared_ptr rowset_writer, _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; _partial_update_info = partial_update_info; - _query_thread_context.init(); + _query_thread_context.init_unlocked(); _reset_mem_table(); diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 8f918d6df554c9b..535fbf772c9527e 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -75,7 +75,7 @@ LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profil // TODO(plat1ko): CloudStorageEngine _rowset_builder = std::make_unique( ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile); - _query_thread_context.init(); // from load stream + _query_thread_context.init_unlocked(); // from load stream } LoadStreamWriter::~LoadStreamWriter() = default; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index dd278c0073e287b..5c3e5b13c628e49 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -62,11 +62,16 @@ #define SCOPED_SKIP_MEMORY_CHECK() \ auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck() #else -#define SCOPED_ATTACH_TASK(arg1, ...) (void)0 -#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) (void)0 -#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) (void)0 -#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 -#define SCOPED_SKIP_MEMORY_CHECK() (void)0 +#define SCOPED_ATTACH_TASK(arg1, ...) \ + auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext() +#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \ + auto VARNAME_LINENUM(scoped_tls_atwi) = doris::ScopedInitThreadContext() +#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ + auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext() +#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(scoped_tls_cmt) = doris::ScopedInitThreadContext() +#define SCOPED_SKIP_MEMORY_CHECK() \ + auto VARNAME_LINENUM(scoped_tls_smc) = doris::ScopedInitThreadContext() #endif // Used to tracking the memory usage of the specified code segment use by mem hook. @@ -91,12 +96,6 @@ #define MEMORY_ORPHAN_CHECK() \ DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \ << doris::memory_orphan_check_msg; -#else -#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) (void)0 -#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) (void)0 -#define ORPHAN_TRACKER_CHECK() (void)0 -#define MEMORY_ORPHAN_CHECK() (void)0 -#endif #define SKIP_LARGE_MEMORY_CHECK(...) \ do { \ @@ -108,6 +107,15 @@ }); \ __VA_ARGS__; \ } while (0) +#else +#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ + auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext() +#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ + auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext() +#define ORPHAN_TRACKER_CHECK() (void)0 +#define MEMORY_ORPHAN_CHECK() (void)0 +#define SKIP_LARGE_MEMORY_CHECK() (void)0 +#endif namespace doris { @@ -284,6 +292,7 @@ static ThreadContext* thread_context() { __builtin_unreachable(); } +// belong to one query object member, not be shared by multiple queries. class QueryThreadContext { public: QueryThreadContext() = default; @@ -291,7 +300,9 @@ class QueryThreadContext { const std::shared_ptr& mem_tracker) : query_id(query_id), query_mem_tracker(mem_tracker) {} - void init() { + // Not thread safe, generally be called in class constructor, shared_ptr use_count may be + // wrong when called by multiple threads, cause crash after object be destroyed prematurely. + void init_unlocked() { #ifndef BE_TEST ORPHAN_TRACKER_CHECK(); query_id = doris::thread_context()->task_id(); @@ -325,6 +336,14 @@ class ScopeMemCountByHook { int64_t* _scope_mem = nullptr; }; +// only hold thread context in scope. +class ScopedInitThreadContext { +public: + explicit ScopedInitThreadContext() { ThreadLocalHandle::create_thread_local_if_not_exits(); } + + ~ScopedInitThreadContext() { ThreadLocalHandle::del_thread_local_if_count_is_zero(); } +}; + class AttachTask { public: explicit AttachTask(const std::shared_ptr& mem_tracker, diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index d1d9aad0114bd17..5ac25819067087c 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -57,7 +57,7 @@ class SimplifiedScanScheduler; class ScanTask { public: ScanTask(std::weak_ptr delegate_scanner) : scanner(delegate_scanner) { - _query_thread_context.init(); + _query_thread_context.init_unlocked(); } ~ScanTask() {