Skip to content

Commit

Permalink
[improve](move-memtable) reduce flush token num (#46001)
Browse files Browse the repository at this point in the history
Fix OOM due to too many flush tokens being created.
Reduce flush token num to 1 per tablet.
  • Loading branch information
kaijchen authored Dec 30, 2024
1 parent e57ccfb commit d139934
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 19 deletions.
14 changes: 5 additions & 9 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_load_id(load_id),
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
load_stream_mgr->create_token(_flush_token);
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
Expand Down Expand Up @@ -178,7 +178,6 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
auto load_stream_max_wait_flush_token_time_ms =
config::load_stream_max_wait_flush_token_time_ms;
Expand All @@ -188,7 +187,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
});
MonotonicStopWatch timer;
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
_status.update(
Status::Error<true>("wait flush token back pressure time is more than "
Expand All @@ -206,7 +205,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = flush_token->submit_func(flush_func);
st = _flush_token->submit_func(flush_func);
}
if (!st.ok()) {
_status.update(st);
Expand Down Expand Up @@ -263,12 +262,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = flush_token->submit_func(add_segment_func);
st = _flush_token->submit_func(add_segment_func);
}
if (!st.ok()) {
_status.update(st);
Expand Down Expand Up @@ -303,9 +301,7 @@ void TabletStream::pre_close() {

SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() {
for (auto& token : _flush_tokens) {
token->wait();
}
_flush_token->wait();
return Status::OK();
}));
// it is necessary to check status after wait_func,
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TabletStream {

int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
std::unique_ptr<ThreadPoolToken> _flush_token;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@

namespace doris {

LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num)
: _num_threads(segment_file_writer_thread_num) {
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
Expand Down
9 changes: 2 additions & 7 deletions be/src/runtime/load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@ class LoadStreamMgr {

Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream);
void clear_load(UniqueId loadid);
void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
for (int i = 0; i < _num_threads * 2; i++) {
tokens.push_back(
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
void create_token(std::unique_ptr<ThreadPoolToken>& token) {
token = _file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
}

std::vector<std::string> get_all_load_stream_ids() {
Expand All @@ -70,8 +67,6 @@ class LoadStreamMgr {
std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;

uint32_t _num_threads = 0;

FifoThreadPool* _heavy_work_pool = nullptr;
FifoThreadPool* _light_work_pool = nullptr;
};
Expand Down

0 comments on commit d139934

Please sign in to comment.