Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Apr 25, 2024
1 parent f79e0ca commit efd5dbd
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 33 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques
RuntimeProfile* profile, const UniqueId& load_id)
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
_rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

CloudDeltaWriter::~CloudDeltaWriter() = default;
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ DECLARE_mInt64(load_error_log_limit_bytes);
// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
// Default, if less than or equal 32 core, the following are 128, 128, 10240, 10240 in turn.
// if greater than 32 core, the following are core num * 4, core num * 4, core num * 320, core num * 320 in turn
DECLARE_Int32(brpc_heavy_work_pool_threads);
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
DECLARE_Int32(brpc_light_work_pool_threads);
Expand Down
7 changes: 3 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ void Daemon::memory_gc_thread() {
}
}

void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
void Daemon::memtable_memory_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more frequently,
// which helps to accurately control the memory of LoadChannelMgr.
while (!_stop_background_threads_latch.wait_for(
Expand Down Expand Up @@ -404,9 +404,8 @@ void Daemon::start() {
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memtable_memory_limiter_tracker_refresh_thread",
[this]() { this->memtable_memory_limiter_tracker_refresh_thread(); },
&_threads.emplace_back());
"Daemon", "memtable_memory_refresh_thread",
[this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

if (config::enable_metric_calculator) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Daemon {
void tcmalloc_gc_thread();
void memory_maintenance_thread();
void memory_gc_thread();
void memtable_memory_limiter_tracker_refresh_thread();
void memtable_memory_refresh_thread();
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/calc_delete_bitmap_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

return _thread_token->submit_func([=, this]() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
_total_size_of_aggregate_states(0),
_mem_usage(0) {
g_memtable_cnt << 1;
_query_thread_context.init();
_query_thread_context.init_unlocked();
_arena = std::make_unique<vectorized::Arena>();
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema);
// TODO: Support ZOrderComparator in the future
Expand Down
19 changes: 11 additions & 8 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {

_last_limit = limit;
_log_timer.reset();
LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
<< " (without allocator cache: "
<< PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
<< "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
// if not exist load task, this log should not be printed.
if (_mem_usage != 0) {
LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
<< " (without allocator cache: "
<< PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
<< "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
}
}

void MemTableMemoryLimiter::_refresh_mem_tracker() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;
_query_thread_context.init();
_query_thread_context.init_unlocked();

_reset_mem_table();

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profil
// TODO(plat1ko): CloudStorageEngine
_rowset_builder = std::make_unique<RowsetBuilder>(
ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
_query_thread_context.init(); // from load stream
_query_thread_context.init_unlocked(); // from load stream
}

LoadStreamWriter::~LoadStreamWriter() = default;
Expand Down
43 changes: 31 additions & 12 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,16 @@
#define SCOPED_SKIP_MEMORY_CHECK() \
auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck()
#else
#define SCOPED_ATTACH_TASK(arg1, ...) (void)0
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) (void)0
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) (void)0
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0
#define SCOPED_SKIP_MEMORY_CHECK() (void)0
#define SCOPED_ATTACH_TASK(arg1, ...) \
auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext()
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \
auto VARNAME_LINENUM(scoped_tls_atwi) = doris::ScopedInitThreadContext()
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext()
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
auto VARNAME_LINENUM(scoped_tls_cmt) = doris::ScopedInitThreadContext()
#define SCOPED_SKIP_MEMORY_CHECK() \
auto VARNAME_LINENUM(scoped_tls_smc) = doris::ScopedInitThreadContext()
#endif

// Used to tracking the memory usage of the specified code segment use by mem hook.
Expand All @@ -91,12 +96,6 @@
#define MEMORY_ORPHAN_CHECK() \
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \
<< doris::memory_orphan_check_msg;
#else
#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) (void)0
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) (void)0
#define ORPHAN_TRACKER_CHECK() (void)0
#define MEMORY_ORPHAN_CHECK() (void)0
#endif

#define SKIP_LARGE_MEMORY_CHECK(...) \
do { \
Expand All @@ -108,6 +107,15 @@
}); \
__VA_ARGS__; \
} while (0)
#else
#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \
auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext()
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext()
#define ORPHAN_TRACKER_CHECK() (void)0
#define MEMORY_ORPHAN_CHECK() (void)0
#define SKIP_LARGE_MEMORY_CHECK() (void)0
#endif

namespace doris {

Expand Down Expand Up @@ -284,14 +292,17 @@ static ThreadContext* thread_context() {
__builtin_unreachable();
}

// belong to one query object member, not be shared by multiple queries.
class QueryThreadContext {
public:
QueryThreadContext() = default;
QueryThreadContext(const TUniqueId& query_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: query_id(query_id), query_mem_tracker(mem_tracker) {}

void init() {
// Not thread safe, generally be called in class constructor, shared_ptr use_count may be
// wrong when called by multiple threads, cause crash after object be destroyed prematurely.
void init_unlocked() {
#ifndef BE_TEST
ORPHAN_TRACKER_CHECK();
query_id = doris::thread_context()->task_id();
Expand Down Expand Up @@ -325,6 +336,14 @@ class ScopeMemCountByHook {
int64_t* _scope_mem = nullptr;
};

// only hold thread context in scope.
class ScopedInitThreadContext {
public:
explicit ScopedInitThreadContext() { ThreadLocalHandle::create_thread_local_if_not_exits(); }

~ScopedInitThreadContext() { ThreadLocalHandle::del_thread_local_if_count_is_zero(); }
};

class AttachTask {
public:
explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SimplifiedScanScheduler;
class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

~ScanTask() {
Expand Down

0 comments on commit efd5dbd

Please sign in to comment.