Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Apr 20, 2024
1 parent d8c4b0e commit 7f09437
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 14 deletions.
20 changes: 13 additions & 7 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ bool MemTableMemoryLimiter::_load_usage_low() {
return _mem_tracker->consumption() <= _load_safe_mem_permit;
}

void MemTableMemoryLimiter::handle_memtable_flush() {
void MemTableMemoryLimiter::handle_memtable_flush(bool is_call_by_local) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
if (!_soft_limit_reached() || _load_usage_low()) {
Expand All @@ -122,7 +122,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
if (_active_mem_usage >=
_write_mem_usage * config::memtable_hard_limit_active_percent / 100) {
_flush_active_memtables(_write_mem_usage / 20);
_flush_active_memtables(_write_mem_usage / 20, is_call_by_local);
}
if (!_hard_limit_reached()) {
break;
Expand All @@ -140,7 +140,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
if (_active_mem_usage >=
_write_mem_usage * config::memtable_soft_limit_active_percent / 100) {
_flush_active_memtables(_write_mem_usage / 20);
_flush_active_memtables(_write_mem_usage / 20, is_call_by_local);
}
}
timer.stop();
Expand All @@ -149,7 +149,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
}

void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush, bool is_call_by_local) {
if (need_flush <= 0) {
return;
}
Expand All @@ -162,7 +162,7 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
int64_t num_flushed = 0;
int64_t avg_mem = _active_mem_usage / _active_writers.size();
for (auto writer : _active_writers) {
int64_t mem = _flush_memtable(writer, avg_mem);
int64_t mem = _flush_memtable(writer, avg_mem, is_call_by_local);
mem_flushed += mem;
num_flushed += (mem > 0);
if (mem_flushed >= need_flush) {
Expand All @@ -174,7 +174,7 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
}

int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush,
int64_t threshold) {
int64_t threshold, bool is_call_by_local) {
auto writer = writer_to_flush.lock();
if (!writer) {
return 0;
Expand All @@ -189,7 +189,13 @@ int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr<MemTableWriter> wri
}
VLOG_DEBUG << "flushing active memtables, active mem usage "
<< PrettyPrinter::print_bytes(mem_usage);
Status st = writer->flush_async();
Status st;
if (is_call_by_local) {
st = writer->flush_async();
} else {
SCOPED_ATTACH_TASK(writer->query_thread_context());
st = writer->flush_async();
}
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/memtable_memory_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MemTableMemoryLimiter {

// check if the total mem consumption exceeds limit.
// If yes, it will flush memtable to try to reduce memory consumption.
void handle_memtable_flush();
void handle_memtable_flush(bool is_call_by_local);

void register_writer(std::weak_ptr<MemTableWriter> writer);

Expand All @@ -57,8 +57,9 @@ class MemTableMemoryLimiter {
bool _soft_limit_reached();
bool _hard_limit_reached();
bool _load_usage_low();
void _flush_active_memtables(int64_t need_flush);
int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, int64_t threshold);
void _flush_active_memtables(int64_t need_flush, bool is_call_by_local);
int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, int64_t threshold,
bool is_call_by_local);
void _refresh_mem_tracker();

std::mutex _lock;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ Status MemTableWriter::_flush_memtable_async() {

Status MemTableWriter::flush_async() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init || _is_closed) {
// This writer is uninitialized or closed before flushing, do nothing.
// We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class MemTableWriter {

uint64_t flush_running_count() const;

QueryThreadContext query_thread_context() { return _query_thread_context; }

private:
// push a full memtable to flush executor
Status _flush_memtable_async();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(false);
}

// 3. add batch to load channel
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(true);
}
SCOPED_TIMER(_write_memtable_timer);
auto st = delta_writer->write(block.get(), rows.row_idxes, false);
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/memtable_memory_limiter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
ASSERT_TRUE(res.ok());
}
static_cast<void>(mem_limiter->init(100));
mem_limiter->handle_memtable_flush();
mem_limiter->handle_memtable_flush(true);
CHECK_EQ(0, mem_limiter->mem_usage());

res = delta_writer->close();
Expand Down

0 comments on commit 7f09437

Please sign in to comment.