Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[performance](move-memtable) async close tablet streams (#41156 & #43813) #44128

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 68 additions & 65 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_status = Status::OK();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
Expand All @@ -71,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,

inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status;
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
return ostr;
}

Expand All @@ -89,19 +88,19 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
_status = Status::Uninitialized("fault injection");
return _status;
_status.update(Status::Uninitialized("fault injection"));
return _status.status();
});
_status = _load_stream_writer->init();
_status.update(_load_stream_writer->init());
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
return _status;
return _status.status();
}

Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status;
return _status.status();
}

// dispatch add_segment request
Expand Down Expand Up @@ -150,8 +149,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
}
if (!st.ok() && _status.ok()) {
_status = st;
if (!st.ok()) {
_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
Expand All @@ -167,11 +166,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
_status = Status::Error<true>(
"wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms);
return _status;
_status.update(
Status::Error<true>("wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms));
return _status.status();
}
bthread_usleep(2 * 1000); // 2ms
}
Expand All @@ -181,14 +180,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
g_load_stream_flush_running_threads << 1;
auto st = flush_token->submit_func(flush_func);
if (!st.ok()) {
_status = st;
_status.update(st);
}
return _status;
return _status.status();
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status;
return _status.status();
}

SCOPED_TIMER(_add_segment_timer);
Expand All @@ -207,17 +206,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
_status = Status::InternalError(
_status.update(Status::InternalError(
"add segment failed, no segment written by this src be yet, src_id={}, "
"segment_id={}",
src_id, segid);
return _status;
src_id, segid));
return _status.status();
}
if (segid >= _segids_mapping[src_id]->size()) {
_status = Status::InternalError(
_status.update(Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
src_id, segid);
return _status;
src_id, segid));
return _status.status();
}
new_segid = _segids_mapping[src_id]->at(segid);
}
Expand All @@ -226,76 +225,76 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _status.ok()) {
_status = st;
if (!st.ok()) {
_status.update(st);
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto st = flush_token->submit_func(add_segment_func);
if (!st.ok()) {
_status = st;
_status.update(st);
}
return _status;
return _status.status();
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
auto st = Status::OK();
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
if (!ret) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}
cv.wait(lock);
return st;
}

if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
void TabletStream::pre_close() {
if (!_status.ok()) {
return;
}

SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() {
for (auto& token : _flush_tokens) {
token->wait();
}
return Status::OK();
}));
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
if (!_status.ok()) {
return _status;
return;
}

auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status.update(Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id)));
return;
}
return _status;

_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status.status();
}

SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
return _status.status();
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand Down Expand Up @@ -363,6 +362,10 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
}
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
tablet_stream->pre_close();
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ class TabletStream {
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
void pre_close();
Status close();
int64_t id() const { return _id; }

friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);

private:
Status _run_in_heavy_work_pool(std::function<Status()> fn);

int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
Expand All @@ -68,7 +71,7 @@ class TabletStream {
int64_t _num_segments = 0;
bool _check_num_segments = true;
bthread::Mutex _lock;
Status _status;
AtomicStatus _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
Expand Down
12 changes: 10 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
return _rowset_writer->add_segment(segid, stat, flush_schema);
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
Expand All @@ -222,6 +221,15 @@ Status LoadStreamWriter::close() {

RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
_pre_closed = true;
return Status::OK();
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
if (!_pre_closed) {
RETURN_IF_ERROR(_pre_close());
}
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
RETURN_IF_ERROR(_rowset_builder->commit_txn());

Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,21 @@ class LoadStreamWriter {

Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema);

Status pre_close() {
std::lock_guard<std::mutex> l(_lock);
return _pre_close();
}

// wait for all memtables to be flushed.
Status close();

private:
// without lock
Status _pre_close();

bool _is_init = false;
bool _is_canceled = false;
bool _pre_closed = false;
WriteRequest _req;
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
Expand Down
Loading