Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Apr 25, 2024
1 parent f79e0ca commit 5f71bd1
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 22 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques
RuntimeProfile* profile, const UniqueId& load_id)
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
_rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

CloudDeltaWriter::~CloudDeltaWriter() = default;
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ DECLARE_mInt64(load_error_log_limit_bytes);
// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
// Default, if less than or equal 32 core, the following are 128, 128, 10240, 10240 in turn.
// if greater than 32 core, the following are core num * 4, core num * 4, core num * 320, core num * 320 in turn
DECLARE_Int32(brpc_heavy_work_pool_threads);
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
DECLARE_Int32(brpc_light_work_pool_threads);
Expand Down
7 changes: 3 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ void Daemon::memory_gc_thread() {
}
}

void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
void Daemon::memtable_memory_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more frequently,
// which helps to accurately control the memory of LoadChannelMgr.
while (!_stop_background_threads_latch.wait_for(
Expand Down Expand Up @@ -404,9 +404,8 @@ void Daemon::start() {
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memtable_memory_limiter_tracker_refresh_thread",
[this]() { this->memtable_memory_limiter_tracker_refresh_thread(); },
&_threads.emplace_back());
"Daemon", "memtable_memory_refresh_thread",
[this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

if (config::enable_metric_calculator) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Daemon {
void tcmalloc_gc_thread();
void memory_maintenance_thread();
void memory_gc_thread();
void memtable_memory_limiter_tracker_refresh_thread();
void memtable_memory_refresh_thread();
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/calc_delete_bitmap_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

return _thread_token->submit_func([=, this]() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
_total_size_of_aggregate_states(0),
_mem_usage(0) {
g_memtable_cnt << 1;
_query_thread_context.init();
_query_thread_context.init_unlocked();
_arena = std::make_unique<vectorized::Arena>();
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema);
// TODO: Support ZOrderComparator in the future
Expand Down
19 changes: 11 additions & 8 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {

_last_limit = limit;
_log_timer.reset();
LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
<< " (without allocator cache: "
<< PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
<< "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
// if not exist load task, this log should not be printed.
if (_mem_usage != 0) {
LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
<< " (without allocator cache: "
<< PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
<< "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
}
}

void MemTableMemoryLimiter::_refresh_mem_tracker() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;
_query_thread_context.init();
_query_thread_context.init_unlocked();

_reset_mem_table();

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profil
// TODO(plat1ko): CloudStorageEngine
_rowset_builder = std::make_unique<RowsetBuilder>(
ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
_query_thread_context.init(); // from load stream
_query_thread_context.init_unlocked(); // from load stream
}

LoadStreamWriter::~LoadStreamWriter() = default;
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,17 @@ static ThreadContext* thread_context() {
__builtin_unreachable();
}

// belong to one query object member, not be shared by multiple queries.
class QueryThreadContext {
public:
QueryThreadContext() = default;
QueryThreadContext(const TUniqueId& query_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: query_id(query_id), query_mem_tracker(mem_tracker) {}

void init() {
// Not thread safe, generally be called in class constructor, shared_ptr use_count may be
// wrong when called by multiple threads, cause crash after object be destroyed prematurely.
void init_unlocked() {
#ifndef BE_TEST
ORPHAN_TRACKER_CHECK();
query_id = doris::thread_context()->task_id();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SimplifiedScanScheduler;
class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

~ScanTask() {
Expand Down

0 comments on commit 5f71bd1

Please sign in to comment.